Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add include_keys and exclude_keys to S3 sink #3122

Merged
merged 3 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add OutputCodecContext for output codecs.
Signed-off-by: Aiden Dai <daixb@amazon.com>
  • Loading branch information
daixba committed Aug 9, 2023
commit 3922e75c3c130f4ee0cbcc1a9dfb18201679386a
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.sink.Sink;

import java.io.IOException;
Expand All @@ -26,21 +27,20 @@ public interface OutputCodec {
*
* @param outputStream outputStream param for wrapping
* @param event Event to auto-generate schema
* @param tagsTargetKey to add tags to the record to create schema
* @param context Extra Context used in Codec.
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
*/
void start(OutputStream outputStream, Event event, String tagsTargetKey) throws IOException;
void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException;

/**
* this method get called from {@link Sink} to write event in {@link OutputStream}
* Implementors should do get data from event and write to the {@link OutputStream}
*
* @param event event Record event
* @param outputStream outputStream param to hold the event data
* @param tagsTargetKey to add tags to the record
* @throws IOException throws IOException when not able to write data to {@link OutputStream}
*/
void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException;
void writeEvent(Event event, OutputStream outputStream) throws IOException;

/**
* this method get called from {@link Sink} to do final wrapping in {@link OutputStream}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -30,11 +31,11 @@ public void setUp() {
public void testWriteMetrics() throws JsonProcessingException {
OutputCodec outputCodec = new OutputCodec() {
@Override
public void start(OutputStream outputStream, Event event, String tagsTargetKey) throws IOException {
public void start(OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException {
}

@Override
public void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException {
public void writeEvent(Event event, OutputStream outputStream) throws IOException {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -46,15 +46,19 @@ public class AvroOutputCodec implements OutputCodec {

private Schema schema;

private OutputCodecContext codecContext;

@DataPrepperPluginConstructor
public AvroOutputCodec(final AvroOutputCodecConfig config) {
Objects.requireNonNull(config);
this.config = config;
}

@Override
public void start(final OutputStream outputStream, final Event event, final String tagsTargetKey) throws IOException {
public void start(final OutputStream outputStream, final Event event, final OutputCodecContext codecContext) throws IOException {
Objects.requireNonNull(outputStream);
Objects.requireNonNull(codecContext);
this.codecContext = codecContext;
if (config.getSchema() != null) {
schema = parseSchema(config.getSchema());
} else if (config.getFileLocation() != null) {
Expand All @@ -63,53 +67,48 @@ public void start(final OutputStream outputStream, final Event event, final Stri
schema = parseSchema(AvroSchemaParserFromSchemaRegistry.getSchemaType(config.getSchemaRegistryUrl()));
} else if (checkS3SchemaValidity()) {
schema = AvroSchemaParserFromS3.parseSchema(config);
}else {
schema = buildInlineSchemaFromEvent(event, tagsTargetKey);
} else {
schema = buildInlineSchemaFromEvent(event);
}
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, outputStream);
}

public Schema buildInlineSchemaFromEvent(final Event event, final String tagsTargetKey) throws IOException {
if(tagsTargetKey!=null){
return parseSchema(buildSchemaStringFromEventMap(addTagsToEvent(event, tagsTargetKey).toMap(), false));
}else{
public Schema buildInlineSchemaFromEvent(final Event event) throws IOException {
if (codecContext != null && codecContext.getTagsTargetKey() != null) {
return parseSchema(buildSchemaStringFromEventMap(addTagsToEvent(event, codecContext.getTagsTargetKey()).toMap(), false));
} else {
return parseSchema(buildSchemaStringFromEventMap(event.toMap(), false));
}
}

private String buildSchemaStringFromEventMap(final Map<String, Object> eventData, boolean nestedRecordFlag) {
final StringBuilder builder = new StringBuilder();
int nestedRecordIndex=1;
if(nestedRecordFlag==false){
int nestedRecordIndex = 1;
if (!nestedRecordFlag) {
builder.append(BASE_SCHEMA_STRING);
}else{
builder.append("{\"type\":\"record\",\"name\":\""+"NestedRecord"+nestedRecordIndex+"\",\"fields\":[");
} else {
builder.append("{\"type\":\"record\",\"name\":\"" + "NestedRecord" + nestedRecordIndex + "\",\"fields\":[");
nestedRecordIndex++;
}
String fields;
int index = 0;
for(final String key: eventData.keySet()){
if(config.getExcludeKeys()==null){
config.setExcludeKeys(new ArrayList<>());
}
if(config.getExcludeKeys().contains(key)){
for (final String key : eventData.keySet()) {
if (codecContext != null && codecContext.getExcludeKeys().contains(key)) {
continue;
}
if(index == 0){
if(!(eventData.get(key) instanceof Map)){
fields = "{\"name\":\""+key+"\",\"type\":\""+typeMapper(eventData.get(key))+"\"}";
if (index == 0) {
if (!(eventData.get(key) instanceof Map)) {
fields = "{\"name\":\"" + key + "\",\"type\":\"" + typeMapper(eventData.get(key)) + "\"}";
} else {
fields = "{\"name\":\"" + key + "\",\"type\":" + typeMapper(eventData.get(key)) + "}";
}
else{
fields = "{\"name\":\""+key+"\",\"type\":"+typeMapper(eventData.get(key))+"}";
}
}
else{
if(!(eventData.get(key) instanceof Map)){
fields = ","+"{\"name\":\""+key+"\",\"type\":\""+typeMapper(eventData.get(key))+"\"}";
}else{
fields = ","+"{\"name\":\""+key+"\",\"type\":"+typeMapper(eventData.get(key))+"}";
} else {
if (!(eventData.get(key) instanceof Map)) {
fields = "," + "{\"name\":\"" + key + "\",\"type\":\"" + typeMapper(eventData.get(key)) + "\"}";
} else {
fields = "," + "{\"name\":\"" + key + "\",\"type\":" + typeMapper(eventData.get(key)) + "}";
}
}
builder.append(fields);
Expand All @@ -120,20 +119,19 @@ private String buildSchemaStringFromEventMap(final Map<String, Object> eventData
}

private String typeMapper(final Object value) {
if(value instanceof Integer || value.getClass().equals(int.class)){
if (value instanceof Integer || value.getClass().equals(int.class)) {
return "int";
}else if(value instanceof Float || value.getClass().equals(float.class)){
} else if (value instanceof Float || value.getClass().equals(float.class)) {
return "float";
}else if(value instanceof Double || value.getClass().equals(double.class)){
} else if (value instanceof Double || value.getClass().equals(double.class)) {
return "double";
}else if(value instanceof Long || value.getClass().equals(long.class)){
} else if (value instanceof Long || value.getClass().equals(long.class)) {
return "long";
}else if(value instanceof Byte[]){
} else if (value instanceof Byte[]) {
return "bytes";
}else if(value instanceof Map){
} else if (value instanceof Map) {
return buildSchemaStringFromEventMap((Map<String, Object>) value, true);
}
else{
} else {
return "string";
}
}
Expand All @@ -145,10 +143,10 @@ public void complete(final OutputStream outputStream) throws IOException {
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream, final String tagsTargetKey) throws IOException {
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
Objects.requireNonNull(event);
if (tagsTargetKey != null) {
final GenericRecord avroRecord = buildAvroRecord(schema, addTagsToEvent(event, tagsTargetKey).toMap());
if (codecContext.getTagsTargetKey() != null) {
final GenericRecord avroRecord = buildAvroRecord(schema, addTagsToEvent(event, codecContext.getTagsTargetKey()).toMap());
dataFileWriter.append(avroRecord);
} else {
final GenericRecord avroRecord = buildAvroRecord(schema, event.toMap());
Expand All @@ -173,9 +171,9 @@ Schema parseSchema(final String schemaString) throws IOException {

private GenericRecord buildAvroRecord(final Schema schema, final Map<String, Object> eventData) {
final GenericRecord avroRecord = new GenericData.Record(schema);
final boolean isExcludeKeyAvailable = !Objects.isNull(config.getExcludeKeys());
final boolean isExcludeKeyAvailable = !codecContext.getExcludeKeys().isEmpty();
for (final String key : eventData.keySet()) {
if (isExcludeKeyAvailable && config.getExcludeKeys().contains(key)) {
if (isExcludeKeyAvailable && codecContext.getExcludeKeys().contains(key)) {
continue;
}
final Schema.Field field = schema.getField(key);
Expand Down Expand Up @@ -228,10 +226,6 @@ private Object schemaMapper(final Schema.Field field, final Object rawValue) {
}

private boolean checkS3SchemaValidity() throws IOException {
if (config.getBucketName() != null && config.getFileKey() != null && config.getRegion() != null) {
return true;
} else {
return false;
}
return config.getBucketName() != null && config.getFileKey() != null && config.getRegion() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.Size;

import java.util.List;

/**
* Configuration class for {@link AvroOutputCodec}.
*/
Expand All @@ -23,8 +21,6 @@ public class AvroOutputCodecConfig {
@JsonProperty("schema_file_location")
private String fileLocation;

@JsonProperty("exclude_keys")
private List<String> excludeKeys;
@Valid
@Size(max = 0, message = "Schema from schema registry is not supported.")
@JsonProperty("schema_registry_url")
Expand All @@ -45,10 +41,6 @@ public class AvroOutputCodecConfig {
@JsonProperty("file_key")
private String fileKey;

public List<String> getExcludeKeys() {
return excludeKeys;
}

public String getSchema() {
return schema;
}
Expand Down Expand Up @@ -76,8 +68,6 @@ public String getBucketName() {
public String getFileKey() {
return fileKey;
}
public void setExcludeKeys(List<String> excludeKeys) {
this.excludeKeys = excludeKeys;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -179,9 +180,10 @@ public void test_HappyCaseAvroInputStream_then_callsConsumerWithParsedEvents(fin
verify(eventConsumer, times(numberOfRecords)).accept(recordArgumentCaptor.capture());
final List<Record<Event>> actualRecords = recordArgumentCaptor.getAllValues();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
avroOutputCodec.start(outputStream, null, null);
OutputCodecContext codecContext = new OutputCodecContext();
avroOutputCodec.start(outputStream, null, codecContext);
for (Record<Event> record : actualRecords) {
avroOutputCodec.writeEvent(record.getData(), outputStream, null);
avroOutputCodec.writeEvent(record.getData(), outputStream);
}
avroOutputCodec.complete(outputStream);
List<GenericRecord> actualOutputRecords = createAvroRecordsList(outputStream);
Expand Down
Loading
Loading