Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@

// All Raw Metadata
outputObject.put("_metadata_source", getSourceMetadataJson(record));

return FailsafeElement.of(outputObject.toString(), outputObject.toString());
}

Expand Down Expand Up @@ -386,7 +385,22 @@
jsonObject.put(fieldName, (Boolean) record.get(fieldName));
break;
case BYTES:
jsonObject.put(fieldName, (byte[]) record.get(fieldName));
if (record.get(fieldName) instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) record.get(fieldName);
byte[] byteArray = new byte[byteBuffer.remaining()];
byteBuffer.get(byteArray);
jsonObject.put(fieldName, byteArray);
} else if (record.get(fieldName) instanceof byte[]) {
jsonObject.put(fieldName, (byte[]) record.get(fieldName));

Check warning on line 394 in v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java#L394

Added line #L394 was not covered by tests
} else {
// Handle other types appropriately, possibly throwing an exception
// if the type is unexpected. Or log it.
throw new IllegalArgumentException(

Check warning on line 398 in v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java#L398

Added line #L398 was not covered by tests
"Unexpected type for field "
+ fieldName
+ ": "
+ record.get(fieldName).getClass().getName());

Check warning on line 402 in v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java#L402

Added line #L402 was not covered by tests
}
break;
case FLOAT:
String value = record.get(fieldName).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class FormatDatastreamRecordToJsonTest {
+ "\"created_at\":\"2020-02-12T00:00:00Z\","
+ "\"datetime_at\":\"2020-02-12T00:00:00Z\","
+ "\"_0col\":1,"
+ "\"timestamp_with_tz\":\"2022-10-13T14:30:00Z\","
+ "\"timestamp_with_tz\":\"2020-10-13T14:30:00.056483Z\","
+ "\"_metadata_stream\":"
+ "\"projects/269744978479/locations/us-central1/streams/datastream-test-fbefaf33\","
+ "\"_metadata_timestamp\":1623459160,"
Expand Down Expand Up @@ -140,6 +140,7 @@ record = dataFileReader.next();
}
}

@Test
public void testParseMySQLPeoplePrimaryKeys() throws IOException, URISyntaxException {
URL resource =
getClass()
Expand All @@ -163,6 +164,7 @@ record = dataFileReader.next();
}
}

@Test
public void testParseMySQLNumbers() throws IOException, URISyntaxException {
URL resource =
getClass()
Expand All @@ -182,6 +184,22 @@ public void testParseMySQLNumbers() throws IOException, URISyntaxException {
assertEquals(EXPECTED_NUMERIC_RECORD, changeEvent.toString());
}

@Test
public void testPostgresByteArray() throws IOException, URISyntaxException {
URL resource =
getClass().getClassLoader().getResource("FormatDatastreamRecordToJsonTest/bytearray.avro");
File file = new File(resource.toURI());
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader);
GenericRecord record = dataFileReader.next();
String jsonData = FormatDatastreamRecordToJson.create().apply(record).getOriginalPayload();
ObjectMapper mapper = new ObjectMapper();
JsonNode changeEvent = mapper.readTree(jsonData);
// The avro file contains binary_content: b'\xde\xad\xbe\xef', which is converted to
// base64 encoded string by Jackson library.
assertEquals("3q2+7w==", changeEvent.get("binary_content").textValue());
}

@Test
public void testHashRowId_valid() {
assertEquals(0L, FormatDatastreamRecord.hashRowIdToInt("AAAAAAAA++++++++++"));
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public DatastreamToDML withSchemaMap(Map<String, String> schemaMap) {
public void processElement(ProcessContext context) {
FailsafeElement<String, String> element = context.element();
String jsonString = element.getPayload();

ObjectMapper mapper = new ObjectMapper();
JsonNode rowObj;

Expand Down Expand Up @@ -284,7 +283,6 @@ public Map<String, String> getSqlTemplateValues(

public String getValueSql(JsonNode rowObj, String columnName, Map<String, String> tableSchema) {
String columnValue;

JsonNode columnObj = rowObj.get(columnName);
if (columnObj == null) {
LOG.warn("Missing Required Value: {} in {}", columnName, rowObj.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public String cleanDataTypeValueSql(
break;
case "INTERVAL":
return convertJsonToPostgresInterval(columnValue, columnName);
case "BYTEA":
// Byte arrays are converted to base64 string representation.
return "decode(" + columnValue + ",'base64')";
}

// Arrays in Postgres are prefixed with underscore e.g. _INT4 for integer array.
Expand Down Expand Up @@ -194,6 +197,10 @@ private String convertJsonToPostgresArray(String jsonValue, String dataType, Str
// Cast string array to jsonb array.
return arrayStatement + "::jsonb[]";
}
if (dataType.equals("_UUID")) {
// Cast string array to uuid array.
return arrayStatement + "::uuid[]";
}
return arrayStatement;

} catch (JsonProcessingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,54 @@ public void testJsonbArray() {
assertEquals(expected, actual);
}

/**
* Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts a
* JSONB array into the correct PostgreSQL array syntax with type casting.
*/
@Test
public void testUuidArray() {
String arrayJson =
"{\"uuid_array\": {"
+ "\"nestedArray\": ["
+ " {\"nestedArray\": null, \"elementValue\": \"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a13\"},"
+ " {\"nestedArray\": null, \"elementValue\": \"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a14\"}"
+ "], \"elementValue\": null}}";
JsonNode rowObj = getRowObj(arrayJson);
Map<String, String> tableSchema = new HashMap<>();
tableSchema.put("uuid_array", "_UUID");
DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null);
String expected =
"ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a13','a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a14']::uuid[]";

String actual = dml.getValueSql(rowObj, "uuid_array", tableSchema);

assertEquals(expected, actual);
}

/**
* Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts a byte
* array into the correct PostgreSQL array syntax with type casting.
*/
@Test
public void testByteArray() {
// Byte arrays are converted to base64 encoded strings by Jackson ObjectNode.toString() in
// FormatDataStreamRecordToJson.
String arrayJson = "{\"binary_content\": \"3q2+7w==\"}";
JsonNode rowObj = getRowObj(arrayJson);

Map<String, String> tableSchema = new HashMap<>();
tableSchema.put("binary_content", "BYTEA");

DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null);

// getValueSql converts byte array to base64 encoded string
String expected = "decode('3q2+7w==','base64')";

String actual = dml.getValueSql(rowObj, "binary_content", tableSchema);

assertEquals(expected, actual);
}

/**
* Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts a JSON
* array into the correct PostgreSQL array syntax with type casting.
Expand Down
Loading