Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ public String getValueSql(JsonNode rowObj, String columnName, Map<String, String
} else {
columnValue = columnObj.toString();
}

return cleanDataTypeValueSql(columnValue, columnName, tableSchema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
*/
package com.google.cloud.teleport.v2.utils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.teleport.v2.datastream.io.CdcJdbcIO.DataSourceConfiguration;
import com.google.cloud.teleport.v2.datastream.values.DatastreamRow;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -102,6 +107,56 @@
}
break;
}
// Arrays in Postgres are prefixed with underscore e.g. _INT4 for integer array.
if (dataType.startsWith("_")) {
return convertJsonToPostgresArray(columnValue);
}
return columnValue;
}

private String convertJsonToPostgresArray(String jsonValue) {
if (jsonValue == null || jsonValue.equals("''") || jsonValue.equals("")) {
return getNullValueSql();

Check warning on line 119 in v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/utils/DatastreamToPostgresDML.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/utils/DatastreamToPostgresDML.java#L119

Added line #L119 was not covered by tests
}

try {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(jsonValue);

if (!(rootNode.isObject() && rootNode.has("nestedArray"))) {
LOG.warn("Empty array: {}", jsonValue);
return getNullValueSql();

Check warning on line 128 in v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/utils/DatastreamToPostgresDML.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/utils/DatastreamToPostgresDML.java#L127-L128

Added lines #L127 - L128 were not covered by tests
}

JsonNode arrayNode = rootNode.get("nestedArray");

// Handle nested structure with elementValue
List<String> elements = new ArrayList<>();
if (arrayNode.isArray()) {
for (JsonNode element : arrayNode) {
if (element.has("elementValue")) {
JsonNode elementValue = element.get("elementValue");
if (!elementValue.isNull()) {
elements.add(formatArrayElement(elementValue));
} else {
elements.add(getNullValueSql());
}
} else if (!element.isNull()) {
elements.add(formatArrayElement(element));

Check warning on line 145 in v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/utils/DatastreamToPostgresDML.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/utils/DatastreamToPostgresDML.java#L145

Added line #L145 was not covered by tests
}
}
}
return "ARRAY[" + String.join(",", elements) + "]";
} catch (JsonProcessingException e) {
LOG.error("Error parsing JSON array: {}", jsonValue);
return getNullValueSql();

Check warning on line 152 in v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/utils/DatastreamToPostgresDML.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/utils/DatastreamToPostgresDML.java#L150-L152

Added lines #L150 - L152 were not covered by tests
}
}

private String formatArrayElement(JsonNode element) {
if (element.isTextual()) {
return "\'" + cleanSql(element.textValue()) + "\'";
}
return element.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class DatastreamToDMLTest {

private static final Logger LOG = LoggerFactory.getLogger(DatastreamToDMLTest.class);
private String jsonString =
private static final String JSON_STRING =
"{"
+ "\"text_column\":\"value\","
+ "\"quoted_text_column\":\"Test Values: '!@#$%^\","
Expand All @@ -42,7 +42,7 @@ public class DatastreamToDMLTest {
+ "\"_metadata_table\":\"MY_TABLE$NAME\""
+ "}";

private JsonNode getRowObj() {
private JsonNode getRowObj(String jsonString) {
ObjectMapper mapper = new ObjectMapper();
JsonNode rowObj;
try {
Expand All @@ -59,7 +59,7 @@ private JsonNode getRowObj() {
*/
@Test
public void testGetValueSql() {
JsonNode rowObj = this.getRowObj();
JsonNode rowObj = this.getRowObj(JSON_STRING);

String expectedTextContent = "'value'";
String testSqlContent =
Expand All @@ -82,14 +82,86 @@ public void testGetValueSql() {
assertEquals(expectedNullByteTextContent, testNullByteSqlContent);
}

/**
* Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts array
* data into correct integer array syntax.
*/
@Test
public void testIntArrayWithNullTypeCoercion() {
String arrayJson =
"{\"number_array\": {"
+ "\"nestedArray\": ["
+ " {\"nestedArray\": null, \"elementValue\": null},"
+ " {\"nestedArray\": null, \"elementValue\": 456}"
+ "], \"elementValue\": null}}";
JsonNode rowObj = this.getRowObj(arrayJson);
Map<String, String> tableSchema = new HashMap<>();
tableSchema.put("number_array", "_int4");
DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null);
String expectedInt = "ARRAY[NULL,456]";

String actualInt =
DatastreamToPostgresDML.of(null).getValueSql(rowObj, "number_array", tableSchema);

assertEquals(expectedInt, actualInt);
}

/**
* Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts array
* data into correct integer array syntax.
*/
@Test
public void testIntArrayTypeCoercion() {
String arrayJson =
"{\"number_array\": {"
+ "\"nestedArray\": ["
+ " {\"nestedArray\": null, \"elementValue\": 123},"
+ " {\"nestedArray\": null, \"elementValue\": 456}"
+ "], \"elementValue\": null}}";
JsonNode rowObj = this.getRowObj(arrayJson);
Map<String, String> tableSchema = new HashMap<>();
tableSchema.put("number_array", "_int4");
DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null);
String expectedInt = "ARRAY[123,456]";

String actualInt =
DatastreamToPostgresDML.of(null).getValueSql(rowObj, "number_array", tableSchema);

assertEquals(expectedInt, actualInt);
}

/**
* Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts array
* data into correct text array syntax.
*/
@Test
public void testTextArrayTypeCoercion() {
String arrayJson =
"{\"text_array\": {"
+ "\"nestedArray\": ["
+ " {\"nestedArray\": null, \"elementValue\": \"apple\"},"
+ " {\"nestedArray\": null, \"elementValue\": \"cherry\"}"
+ "], \"elementValue\": null}}";
JsonNode rowObj = this.getRowObj(arrayJson);
Map<String, String> tableSchema = new HashMap<>();
tableSchema.put("text_array", "_text");
DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null);
String expectedInt = "ARRAY['apple','cherry']";

String actualInt =
DatastreamToPostgresDML.of(null).getValueSql(rowObj, "text_array", tableSchema);

assertEquals(expectedInt, actualInt);
}

/**
* Test whether {@link DatastreamToDML#getTargetSchemaName} converts the Oracle schema into the
* correct Postgres schema.
*/
@Test
public void testGetPostgresSchemaName() {
DatastreamToDML datastreamToDML = DatastreamToPostgresDML.of(null);
JsonNode rowObj = this.getRowObj();
JsonNode rowObj = this.getRowObj(JSON_STRING);
DatastreamRow row = DatastreamRow.of(rowObj);

String expectedSchemaName = "my_schema";
Expand All @@ -104,7 +176,7 @@ public void testGetPostgresSchemaName() {
@Test
public void testGetPostgresTableName() {
DatastreamToDML datastreamToDML = DatastreamToPostgresDML.of(null);
JsonNode rowObj = this.getRowObj();
JsonNode rowObj = this.getRowObj(JSON_STRING);
DatastreamRow row = DatastreamRow.of(rowObj);

String expectedTableName = "my_table$name";
Expand Down
Loading