Skip to content
This repository was archived by the owner on Sep 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toList;
Expand All @@ -40,22 +41,14 @@ public class RecordConverter {
private final TableDescription tableDesc;
private final String topic_name;
private Schema keySchema;
private final Schema valueSchema;
private Schema valueSchema;

private List<String> keys;

public RecordConverter(TableDescription tableDesc, String topicNamePrefix) {
this.tableDesc = tableDesc;
this.topic_name = topicNamePrefix;
this.topic_name = topicNamePrefix;

valueSchema = SchemaBuilder.struct()
.name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope"))
.field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.DOCUMENT, DynamoDbJson.schema())
.field(Envelope.FieldName.SOURCE, SourceInfo.structSchema())
.field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA)
.build();
}

public SourceRecord toSourceRecord(
Expand All @@ -79,7 +72,17 @@ public SourceRecord toSourceRecord(
//Map<String, Object> unMarshalledItems = ItemUtils.toSimpleMapValue(attributes);

//JSON conversion
String outputJsonString = ItemUtils.toItem(attributes).toJSON();
//String outputJsonString = ItemUtils.toItem(attributes).toJSON();
Struct dynamoAttributes = getAttributeValueStruct(sanitisedAttributes);

valueSchema = SchemaBuilder.struct()
.name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope"))
.field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.DOCUMENT, getAttributeValueSchema(sanitisedAttributes))
.field(Envelope.FieldName.SOURCE, SourceInfo.structSchema())
.field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA)
.build();

// Leveraging offsets to store shard and sequence number with each item pushed to Kafka.
// This info will only be used to update `shardRegister` and won't be used to reset state after restart
Expand Down Expand Up @@ -108,7 +111,7 @@ public SourceRecord toSourceRecord(

Struct valueData = new Struct(valueSchema)
.put(Envelope.FieldName.VERSION, sourceInfo.version)
.put(Envelope.FieldName.DOCUMENT, outputJsonString) // objectMapper.writeValueAsString(outputJsonString))
.put(Envelope.FieldName.DOCUMENT, dynamoAttributes) // objectMapper.writeValueAsString(outputJsonString))
.put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo))
.put(Envelope.FieldName.OPERATION, op.code())
.put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli());
Expand Down Expand Up @@ -145,4 +148,65 @@ private String sanitiseAttributeName(final String attributeName) {

return sanitisedAttributeName;
}

public static Struct getAttributeValueStruct(Map<String, AttributeValue> attributes) {
final Struct attributeValueStruct = new Struct(getAttributeValueSchema(attributes));

// Mapping dynamo db attributes to schema registry types (dynamo db attributes are documented at below link)
//https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-dynamodb/src/main/java/com/amazonaws/services/dynamodbv2/model/AttributeValue.java

for (Map.Entry<String, AttributeValue> attribute : attributes.entrySet()) {
final String attributeName = attribute.getKey();
final AttributeValue attributeValue = attribute.getValue();
if (attributeValue.getS() != null) {
attributeValueStruct.put(attributeName, attributeValue.getS());
} else if (attributeValue.getN() != null) {
attributeValueStruct.put(attributeName, attributeValue.getN());
} else if (attributeValue.getB() != null) {
attributeValueStruct.put(attributeName, attributeValue.getB());
} else if (attributeValue.getSS() != null) {
attributeValueStruct.put(attributeName, attributeValue.getSS());
} else if (attributeValue.getNS() != null) {
attributeValueStruct.put(attributeName, attributeValue.getNS());
} else if (attributeValue.getBS() != null) {
attributeValueStruct.put(attributeName, attributeValue.getBS());
} else if (attributeValue.getNULL() != null) {
attributeValueStruct.put(attributeName, attributeValue.getNULL());
} else if (attributeValue.getBOOL() != null) {
attributeValueStruct.put(attributeName, attributeValue.getBOOL());
}
}
return attributeValueStruct;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably have an else? what if none of the else ifs match?
Also -> i'm sure you recognize the ugliness. Any better way?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A switch maybe?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep i will refactor some of this code once i make sure this is working with data in dev. will make it more cleaner and easy to read with comments

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the underlying AttributeValue.java. hideous code.
private String n; -> screw you!
to be clear, not gurjit's code.

}

public static Schema getAttributeValueSchema(Map<String, AttributeValue> attributes) {
SchemaBuilder RECORD_ATTRIBUTES_SCHEMA = SchemaBuilder.struct().name("DynamoDB.AttributeValue");

// Mapping dynamo db attributes to schema registry types (dynamo db attributes are documented at below link)
//https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-dynamodb/src/main/java/com/amazonaws/services/dynamodbv2/model/AttributeValue.java

for (Map.Entry<String, AttributeValue> attribute : attributes.entrySet()) {
final String attributeName = attribute.getKey();
final AttributeValue attributeValue = attribute.getValue();
if (attributeValue.getS() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.STRING_SCHEMA);
} else if (attributeValue.getN() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.STRING_SCHEMA);
} else if (attributeValue.getB() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.BYTES_SCHEMA);
} else if (attributeValue.getSS() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, SchemaBuilder.array(Schema.STRING_SCHEMA));
} else if (attributeValue.getNS() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, SchemaBuilder.array(Schema.STRING_SCHEMA));
} else if (attributeValue.getBS() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, SchemaBuilder.array(Schema.BYTES_SCHEMA));
} else if (attributeValue.getNULL() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.BOOLEAN_SCHEMA);
} else if (attributeValue.getBOOL() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.BOOLEAN_SCHEMA);
}
}
return RECORD_ATTRIBUTES_SCHEMA.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import com.google.gson.JsonParser;
import com.google.gson.JsonObject;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -279,12 +280,22 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx
assertEquals(Instant.parse("2001-01-01T00:00:00.00Z"), task.getSourceInfo().lastInitSyncStart);
assertEquals(1, task.getSourceInfo().initSyncCount);

String expected = "{col2:val1,col3:1,col1:key1}";
JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject();
final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue")
.field("col2", Schema.STRING_SCHEMA)
.field("col3", Schema.STRING_SCHEMA)
.field("col1", Schema.STRING_SCHEMA)
.build();

final Struct expectedDocument = new Struct(expectedDocumentSchema)
.put("col2","val1")
.put("col3","1")
.put("col1","key1");

Struct actualDocument = ((Struct) response.get(0).value()).getStruct("document") ;

assertEquals(1, response.size());
assertEquals("r", ((Struct) response.get(0).value()).getString("op"));
assertEquals(expectedJson.toString(), ((Struct) response.get(0).value()).getString("document"));
assertEquals("r", ((Struct) response.get(0).value()).getString("op"));
compareStructs(expectedDocument, actualDocument);
assertEquals(InitSyncStatus.RUNNING, task.getSourceInfo().initSyncStatus);
assertEquals(exclusiveStartKey, task.getSourceInfo().exclusiveStartKey);
}
Expand Down Expand Up @@ -564,15 +575,30 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException {
task.start(configs);
List<SourceRecord> response = task.poll();

String expected = "{col2:val1,col3:1,col1:key1}";
String expectedKey = "{col1:key2}";
JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject();
JsonObject expectedKeyJson = new JsonParser().parse(expectedKey).getAsJsonObject();
final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue")
.field("col2", Schema.STRING_SCHEMA)
.field("col3", Schema.STRING_SCHEMA)
.field("col1", Schema.STRING_SCHEMA)
.build();
final Struct expectedDocument = new Struct(expectedDocumentSchema)
.put("col2","val1")
.put("col3","1")
.put("col1","key1");

final Schema expectedDocumentColSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue")
.field("col1", Schema.STRING_SCHEMA)
.build();

final Struct expectedDocColValue = new Struct(expectedDocumentColSchema)
.put("col1","key2");

Struct actualDocument = ((Struct) response.get(0).value()).getStruct("document") ;
Struct actualDocumentCol = ((Struct) response.get(1).value()).getStruct("document");

// Assert
assertEquals(3, response.size());
assertEquals(expectedJson.toString(), ((Struct) response.get(0).value()).getString("document"));
assertEquals(expectedKeyJson.toString(), ((Struct) response.get(1).value()).getString("document"));
assertEquals(3, response.size());
compareStructs(expectedDocument, actualDocument);
compareStructs(expectedDocColValue, actualDocumentCol);
assertNull(response.get(2).value()); // tombstone
}

Expand Down Expand Up @@ -886,4 +912,18 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep
assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo());
}

public void compareStructs(Struct expectedStruct , Struct actualStruct) {

// comparing schema for both struct
if (!Objects.equals(expectedStruct.schema(), actualStruct.schema())) {
fail("Schema expected " + expectedStruct.schema().fields() + " but actual " + actualStruct.schema().fields());
}

// comparing all fields for both struct
for (Field expectedFieldName : expectedStruct.schema().fields()) {
Field actualFieldName = actualStruct.schema().field(expectedFieldName.name());
assertEquals(expectedStruct.get(expectedFieldName), actualStruct.get(actualFieldName));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import com.google.gson.JsonParser;
import com.google.gson.JsonObject;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.*;


@SuppressWarnings("SameParameterValue")
Expand Down Expand Up @@ -198,12 +201,27 @@ public void recordAttributesAreAddedToValueData() throws Exception {
"testSequenceNumberID1"
);

String expected = "{testKV1:testKV1Value,testKV2:'2',testV2:testStringValue,testV1:1}";
JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject();
//String expected = "{testKV1:testKV1Value,testKV2:'2',testV2:testStringValue,testV1:1}";
//JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject();

final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue")
.field("testKV1", Schema.STRING_SCHEMA)
.field("testKV2", Schema.STRING_SCHEMA)
.field("testV2", Schema.STRING_SCHEMA)
.field("testV1", Schema.STRING_SCHEMA)
.build();

final Struct expectedDocument = new Struct(expectedDocumentSchema)
.put("testKV1","testKV1Value")
.put("testKV2","2")
.put("testV2","testStringValue")
.put("testV1","1");

Struct actualDocument = ((Struct) record.value()).getStruct("document") ;

// Assert
assertEquals(expectedJson.toString(),
((Struct) record.value()).getString("document"));
//assertEquals(expectedJson.toString(), ((Struct) record.value()).getString("document"));
compareStructs(expectedDocument, actualDocument);
}

@Test
Expand Down Expand Up @@ -274,12 +292,25 @@ public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidChar
"testSequenceNumberID1"
);

String expected = "{test-1234:testKV1Value,_starts_with_underscore:1,1-starts-with-number:'2',test!@£$%^:testStringValue}";
JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject();
//String expected = "{test-1234:testKV1Value,_starts_with_underscore:1,1-starts-with-number:'2',test!@£$%^:testStringValue}";
//JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject();

final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue")
.field("test1234", Schema.STRING_SCHEMA)
.field("_starts_with_underscore", Schema.STRING_SCHEMA)
.field("startswithnumber", Schema.STRING_SCHEMA)
.field("test", Schema.STRING_SCHEMA)
.build();

final Struct expectedDocument = new Struct(expectedDocumentSchema)
.put("test1234","testKV1Value")
.put("_starts_with_underscore","1")
.put("startswithnumber","2")
.put("test","testStringValue");

Struct actualDocument = ((Struct) record.value()).getStruct("document") ;
// Assert
assertEquals(expectedJson.toString(),
((Struct) record.value()).getString("document"));
compareStructs(expectedDocument, actualDocument);
}

@Test
Expand Down Expand Up @@ -342,4 +373,18 @@ public void arrivalTimestampIsAddedToValueData() throws Exception {
assertEquals(978393600000L, ((Struct) record.value()).getInt64("ts_ms"));
}

public void compareStructs(Struct expectedStruct , Struct actualStruct) {

// comparing schema for both struct
if (!Objects.equals(expectedStruct.schema(), actualStruct.schema())) {
fail("Schema expected " + expectedStruct.schema().fields() + " but actual " + actualStruct.schema().fields());
}

// comparing all fields for both struct
for (Field expectedFieldName : expectedStruct.schema().fields()) {
Field actualFieldName = actualStruct.schema().field(expectedFieldName.name());
assertEquals(expectedStruct.get(expectedFieldName), actualStruct.get(actualFieldName));
}

}
}