Skip to content

Commit

Permalink
-Support for Sink Codecs (opensearch-project#2986)
Browse files Browse the repository at this point in the history
* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

* -Support for Sink Codecs
Signed-off-by: umairofficial <umairhusain1010@gmail.com>

---------

Co-authored-by: umairofficial <umairhusain1010@gmail.com>
  • Loading branch information
umayr-codes and umairofficial authored Jul 26, 2023
1 parent 548b5e0 commit 4c4677b
Show file tree
Hide file tree
Showing 29 changed files with 1,801 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ public interface OutputCodec {
* this method get called from {@link Sink} to do initial wrapping in {@link OutputStream}
* Implementors should do initial wrapping according to the implementation
*
* @param outputStream outputStream param for wrapping
* @param outputStream outputStream param for wrapping
* @param event Event to auto-generate schema
* @param tagsTargetKey to add tags to the record to create schema
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
*/
void start(OutputStream outputStream) throws IOException;
void start(OutputStream outputStream, Event event, String tagsTargetKey) throws IOException;

/**
* this method get called from {@link Sink} to write event in {@link OutputStream}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void setUp() {
public void testWriteMetrics() throws JsonProcessingException {
OutputCodec outputCodec = new OutputCodec() {
@Override
public void start(OutputStream outputStream) throws IOException {
public void start(OutputStream outputStream, Event event, String tagsTargetKey) throws IOException {
}

@Override
Expand Down
74 changes: 74 additions & 0 deletions data-prepper-plugins/avro-codecs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Avro Sink/Output Codec

This is an implementation of Avro Sink Codec that parses the Dataprepper Events into avro records and writes them into the underlying OutputStream.

## Usages

Avro Output Codec can be configured with sink plugins (e.g. S3 Sink) in the Pipeline file.

## Configuration Options

```
pipeline:
...
sink:
- s3:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
sts_header_overrides:
max_retries: 5
bucket: bucket_name
object_key:
path_prefix: my-elb/%{yyyy}/%{MM}/%{dd}/
threshold:
event_count: 2000
maximum_size: 50mb
event_collect_timeout: 15s
codec:
avro:
schema: "{\"namespace\": \"org.example.test\"," +
" \"type\": \"record\"," +
" \"name\": \"TestMessage\"," +
" \"fields\": [" +
" {\"name\": \"name\", \"type\": \"string\"}," +
" {\"name\": \"age\", \"type\": \"int\"}]" +
"}";
exclude_keys:
- s3
buffer_type: in_memory
```

## AWS Configuration

### Codec Configuration:

1) `schema`: A json string that user can provide in the yaml file itself. The codec parses schema object from this schema string.
2) `exclude_keys`: Those keys of the events that the user wants to exclude while converting them to avro records.

### Note:

1) User can provide only one schema at a time i.e. through either of the ways provided in codec config.
2) If the user wants the tags to be a part of the resultant Avro Data and has given `tagsTargetKey` in the config file, the user also has to modify the schema to accommodate the tags. Another field has to be provided in the `schema.json` file:

`{
"name": "yourTagsTargetKey",
"type": { "type": "array",
"items": "string"
}`
3) If the user doesn't provide any schema, the codec will auto-generate schema from the first event in the buffer.

## Developer Guide

This plugin is compatible with Java 11. See below

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)

The integration tests for this plugin do not run as part of the Data Prepper build.

The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:s3-sink:integrationTest -Dtests.s3sink.region=<your-aws-region> -Dtests.s3sink.bucket=<your-bucket>
```
2 changes: 2 additions & 0 deletions data-prepper-plugins/avro-codecs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ dependencies {
implementation project(path: ':data-prepper-api')
implementation 'org.apache.avro:avro:1.11.1'
implementation 'org.apache.parquet:parquet-common:1.12.3'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2'
testImplementation 'org.json:json:20230227'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,28 @@
*/
package org.opensearch.dataprepper.plugins.codec.avro;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* An implementation of {@link OutputCodec} which deserializes Data-Prepper events
Expand All @@ -20,36 +34,204 @@
@DataPrepperPlugin(name = "avro", pluginType = OutputCodec.class, pluginConfigurationType = AvroOutputCodecConfig.class)
public class AvroOutputCodec implements OutputCodec {

private static final List<String> nonComplexTypes = Arrays.asList("int", "long", "string", "float", "double", "bytes");
private static final Logger LOG = LoggerFactory.getLogger(AvroOutputCodec.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String AVRO = "avro";
private static final String BASE_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"AvroRecords\",\"fields\":[";
private static final String END_SCHEMA_STRING = "]}";

private final AvroOutputCodecConfig config;
private DataFileWriter<GenericRecord> dataFileWriter;

private Schema schema;

@DataPrepperPluginConstructor
public AvroOutputCodec(final AvroOutputCodecConfig config) {
// TODO: initiate config
Objects.requireNonNull(config);
this.config = config;
}

@Override
public void start(final OutputStream outputStream) throws IOException {
// TODO: do the initial wrapping
public void start(final OutputStream outputStream, final Event event, final String tagsTargetKey) throws IOException {
Objects.requireNonNull(outputStream);
if (config.getSchema() != null) {
schema = parseSchema(config.getSchema());
} else if (config.getFileLocation() != null) {
schema = AvroSchemaParser.parseSchemaFromJsonFile(config.getFileLocation());
} else if (config.getSchemaRegistryUrl() != null) {
schema = parseSchema(AvroSchemaParserFromSchemaRegistry.getSchemaType(config.getSchemaRegistryUrl()));
} else if (checkS3SchemaValidity()) {
schema = AvroSchemaParserFromS3.parseSchema(config);
}else {
schema = buildInlineSchemaFromEvent(event, tagsTargetKey);
}
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, outputStream);
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream,final String tagsTargetKey) throws IOException {
// TODO: write event data to the outputstream
public Schema buildInlineSchemaFromEvent(final Event event, final String tagsTargetKey) throws IOException {
if(tagsTargetKey!=null){
return parseSchema(buildSchemaStringFromEventMap(addTagsToEvent(event, tagsTargetKey).toMap(), false));
}else{
return parseSchema(buildSchemaStringFromEventMap(event.toMap(), false));
}
}

private String buildSchemaStringFromEventMap(final Map<String, Object> eventData, boolean nestedRecordFlag) {
final StringBuilder builder = new StringBuilder();
int nestedRecordIndex=1;
if(nestedRecordFlag==false){
builder.append(BASE_SCHEMA_STRING);
}else{
builder.append("{\"type\":\"record\",\"name\":\""+"NestedRecord"+nestedRecordIndex+"\",\"fields\":[");
nestedRecordIndex++;
}
String fields;
int index = 0;
for(final String key: eventData.keySet()){
if(config.getExcludeKeys()==null){
config.setExcludeKeys(new ArrayList<>());
}
if(config.getExcludeKeys().contains(key)){
continue;
}
if(index == 0){
if(!(eventData.get(key) instanceof Map)){
fields = "{\"name\":\""+key+"\",\"type\":\""+typeMapper(eventData.get(key))+"\"}";
}
else{
fields = "{\"name\":\""+key+"\",\"type\":"+typeMapper(eventData.get(key))+"}";
}
}
else{
if(!(eventData.get(key) instanceof Map)){
fields = ","+"{\"name\":\""+key+"\",\"type\":\""+typeMapper(eventData.get(key))+"\"}";
}else{
fields = ","+"{\"name\":\""+key+"\",\"type\":"+typeMapper(eventData.get(key))+"}";
}
}
builder.append(fields);
index++;
}
builder.append(END_SCHEMA_STRING);
return builder.toString();
}

private String typeMapper(final Object value) {
if(value instanceof Integer || value.getClass().equals(int.class)){
return "int";
}else if(value instanceof Float || value.getClass().equals(float.class)){
return "float";
}else if(value instanceof Double || value.getClass().equals(double.class)){
return "double";
}else if(value instanceof Long || value.getClass().equals(long.class)){
return "long";
}else if(value instanceof Byte[]){
return "bytes";
}else if(value instanceof Map){
return buildSchemaStringFromEventMap((Map<String, Object>) value, true);
}
else{
return "string";
}
}

@Override
public void complete(final OutputStream outputStream) throws IOException {
// TODO: do the final wrapping like closing outputstream
dataFileWriter.close();
outputStream.close();
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream, final String tagsTargetKey) throws IOException {
Objects.requireNonNull(event);
if (tagsTargetKey != null) {
final GenericRecord avroRecord = buildAvroRecord(schema, addTagsToEvent(event, tagsTargetKey).toMap());
dataFileWriter.append(avroRecord);
} else {
final GenericRecord avroRecord = buildAvroRecord(schema, event.toMap());
dataFileWriter.append(avroRecord);
}
}

@Override
public String getExtension() {
return null;
return AVRO;
}

static Schema parseSchema(final String schema) {
// TODO: generate schema from schema string and return
return null;
Schema parseSchema(final String schemaString) throws IOException {
try {
Objects.requireNonNull(schemaString);
return new Schema.Parser().parse(schemaString);
} catch (Exception e) {
LOG.error("Unable to parse Schema from Schema String provided.");
throw new IOException("Can't proceed without schema.");
}
}

}
private GenericRecord buildAvroRecord(final Schema schema, final Map<String, Object> eventData) {
final GenericRecord avroRecord = new GenericData.Record(schema);
final boolean isExcludeKeyAvailable = !Objects.isNull(config.getExcludeKeys());
for (final String key : eventData.keySet()) {
if (isExcludeKeyAvailable && config.getExcludeKeys().contains(key)) {
continue;
}
final Schema.Field field = schema.getField(key);
final Object value = schemaMapper(field, eventData.get(key));
avroRecord.put(key, value);
}
return avroRecord;
}

private Object schemaMapper(final Schema.Field field, final Object rawValue) {
Object finalValue = null;
final String fieldType = field.schema().getType().name().toLowerCase();
if (nonComplexTypes.contains(fieldType)) {
switch (fieldType) {
case "string":
finalValue = rawValue.toString();
break;
case "int":
finalValue = Integer.parseInt(rawValue.toString());
break;
case "float":
finalValue = Float.parseFloat(rawValue.toString());
break;
case "double":
finalValue = Double.parseDouble(rawValue.toString());
break;
case "long":
finalValue = Long.parseLong(rawValue.toString());
break;
case "bytes":
finalValue = rawValue.toString().getBytes(StandardCharsets.UTF_8);
break;
default:
LOG.error("Unrecognised Field name : '{}' & type : '{}'", field.name(), fieldType);
break;
}
} else {
if (fieldType.equals("record") && rawValue instanceof Map) {
finalValue = buildAvroRecord(field.schema(), (Map<String, Object>) rawValue);
} else if (fieldType.equals("array") && rawValue instanceof List) {
GenericData.Array<String> avroArray =
new GenericData.Array<>(((List<String>) rawValue).size(), field.schema());
for (String element : ((List<String>) rawValue)) {
avroArray.add(element);
}
finalValue = avroArray;
}
}
return finalValue;
}

private boolean checkS3SchemaValidity() throws IOException {
if (config.getBucketName() != null && config.getFileKey() != null && config.getRegion() != null) {
return true;
} else {
return false;
}
}
}
Loading

0 comments on commit 4c4677b

Please sign in to comment.