Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,24 @@
try {
List<Map<String, Object>> records = new ArrayList<>();
mutations.forEach(
entry ->
records.add(
entry.asMap().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));

entry -> {
Map<String, Object> record = new HashMap<>();
entry
.asMap()
.forEach(

Check warning on line 81 in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java

View check run for this annotation

Codecov / codecov/patch

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java#L78-L81

Added lines #L78 - L81 were not covered by tests
(key, value) -> {
Object newValue = value;

Check warning on line 83 in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java

View check run for this annotation

Codecov / codecov/patch

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java#L83

Added line #L83 was not covered by tests
// null `Value` objects `toString()` returns "NULL", causing test failures
// when compared with Java nulls `toString()`. This workaround handles that.
if (value.isNull()) {
newValue = null;

Check warning on line 87 in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java

View check run for this annotation

Codecov / codecov/patch

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java#L87

Added line #L87 was not covered by tests
} else if (value.getType() == Type.array(Type.string())) {
newValue = new ArrayList<>(value.getAsStringList());

Check warning on line 89 in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java

View check run for this annotation

Codecov / codecov/patch

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java#L89

Added line #L89 was not covered by tests
}
record.put(key, newValue);
});
records.add(record);
});

Check warning on line 94 in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java

View check run for this annotation

Codecov / codecov/patch

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java#L91-L94

Added lines #L91 - L94 were not covered by tests
return records;
} catch (Exception e) {
throw new RuntimeException("Error converting TableResult to Records", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public Mutation apply(GenericRecord record) {
case PG_TEXT:
case JSON:
case PG_JSONB:
case UUID:
case PG_UUID:
builder.set(column.name()).to(readString(record, avroType, fieldName).orElse(null));
break;
case BYTES:
Expand Down Expand Up @@ -168,6 +170,8 @@ public Mutation apply(GenericRecord record) {
case PG_VARCHAR:
case PG_TEXT:
case JSON:
case UUID:
case PG_UUID:
builder
.set(column.name())
.toStringArray(readStringArray(record, arrayType, fieldName).orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

import com.google.cloud.spanner.Dialect;
import com.google.cloud.teleport.spanner.common.NumericUtils;
import com.google.cloud.teleport.spanner.common.SizedType;
import com.google.cloud.teleport.spanner.common.Type;
import com.google.cloud.teleport.spanner.ddl.ChangeStream;
import com.google.cloud.teleport.spanner.ddl.Column;
Expand Down Expand Up @@ -561,7 +562,7 @@
}
if (Strings.isNullOrEmpty(sqlType)) {
Type spannerType = inferType(avroType, true);
sqlType = toString(spannerType, true);
sqlType = SizedType.typeString(spannerType, -1, true);
}
String defaultExpression = f.getProp(DEFAULT_EXPRESSION);
column.parseType(sqlType).notNull(!nullable).defaultExpression(defaultExpression);
Expand Down Expand Up @@ -685,6 +686,11 @@
? com.google.cloud.teleport.spanner.common.Type.float64()
: com.google.cloud.teleport.spanner.common.Type.pgFloat8();
case STRING:
if (LogicalTypes.uuid().equals(logicalType)) {
return (dialect == Dialect.GOOGLE_STANDARD_SQL)
? com.google.cloud.teleport.spanner.common.Type.uuid()
: com.google.cloud.teleport.spanner.common.Type.pgUuid();

Check warning on line 692 in v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java#L691-L692

Added lines #L691 - L692 were not covered by tests
}
return (dialect == Dialect.GOOGLE_STANDARD_SQL)
? com.google.cloud.teleport.spanner.common.Type.string()
: com.google.cloud.teleport.spanner.common.Type.pgVarchar();
Expand Down Expand Up @@ -727,79 +733,4 @@
}
throw new IllegalArgumentException("Cannot infer a type " + f);
}

private String toString(
com.google.cloud.teleport.spanner.common.Type spannerType, boolean supportArray) {
switch (spannerType.getCode()) {
case BOOL:
return "BOOL";
case PG_BOOL:
return "boolean";
case INT64:
return "INT64";
case PG_INT8:
return "bigint";
case FLOAT32:
return "FLOAT32";
case PG_FLOAT4:
return "real";
case FLOAT64:
return "FLOAT64";
case PG_FLOAT8:
return "double precision";
case STRING:
return "STRING(MAX)";
case PG_TEXT:
return "text";
case PG_VARCHAR:
return "character varying";
case BYTES:
return "BYTES(MAX)";
case PG_BYTEA:
return "bytea";
case TIMESTAMP:
return "TIMESTAMP";
case PG_TIMESTAMPTZ:
return "timestamp with time zone";
case PG_SPANNER_COMMIT_TIMESTAMP:
return "spanner.commit_timestamp";
case DATE:
return "DATE";
case PG_DATE:
return "date";
case NUMERIC:
return "NUMERIC";
case PG_NUMERIC:
return "numeric";
case JSON:
return "JSON";
case PROTO:
return "PROTO<" + spannerType.getProtoTypeFqn() + ">";
case ENUM:
return "ENUM<" + spannerType.getProtoTypeFqn() + ">";
case ARRAY:
{
if (supportArray) {
com.google.cloud.teleport.spanner.common.Type element =
spannerType.getArrayElementType();
String elementStr = toString(element, false);
return "ARRAY<" + elementStr + ">";
}
// otherwise fall through and throw an error.
break;
}
case PG_ARRAY:
{
if (supportArray) {
com.google.cloud.teleport.spanner.common.Type element =
spannerType.getArrayElementType();
String elementStr = toString(element, false);
return elementStr + "[]";
}
// otherwise fall through and throw an error.
break;
}
}
throw new IllegalArgumentException("Cannot to string the type " + spannerType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,28 @@ String createColumnExpression(Column col) {
+ ") AS element), []) END AS "
+ col.name();
}
// TODO(b/394493438): Remove casting once google-cloud-spanner supports UUID type
if (col.typeString().equals("UUID")) {
return String.format("CAST(t.`%s` AS STRING) AS %s", col.name(), col.name());
}
if (col.typeString().equals("ARRAY<UUID>")) {
return String.format(
"CASE WHEN t.`%s` IS NULL THEN NULL ELSE "
+ "ARRAY(SELECT CAST(e AS STRING) FROM UNNEST(%s) AS e) END AS %s",
col.name(), col.name(), col.name());
}
return "t.`" + col.name() + "`";
case POSTGRESQL:
// TODO(b/394493438): Remove casting once google-cloud-spanner supports UUID type
if (col.typeString().equals("uuid")) {
return String.format("t.\"%s\"::text AS \"%s\"", col.name(), col.name());
}
if (col.typeString().equals("uuid[]")) {
return String.format(
"CASE WHEN t.\"%s\" IS NULL THEN NULL ELSE "
+ "ARRAY(SELECT e::text FROM UNNEST(t.\"%s\") AS e) END AS \"%s\"",
col.name(), col.name(), col.name());
}
return "t.\"" + col.name() + "\"";
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ protected final Mutation parseRow(
case PG_TEXT:
columnValue = Value.string(cellValue);
break;
case UUID:
case PG_UUID:
columnValue = Value.string(isNullValue ? null : cellValue);
break;
case DATE:
case PG_DATE:
if (isNullValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ private Schema avroType(
case JSON:
case PG_JSONB:
return SchemaBuilder.builder().stringType();
case UUID:
case PG_UUID:
return LogicalTypes.uuid().addToSchema(SchemaBuilder.builder().stringType());
case BYTES:
case PG_BYTEA:
case PROTO:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@
c.output(ddl);
}
}));
PCollection<ReadOperation> tables =
PCollection<ReadOperation> tableReadOperations =

Check warning on line 285 in v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java#L285

Added line #L285 was not covered by tests
ddl.apply("Build table read operations", new BuildReadFromTableOperations(tableNames));

PCollection<KV<String, Void>> allTableAndViewNames =
Expand Down Expand Up @@ -455,7 +455,7 @@
.apply("As view", View.asMap());

PCollection<Struct> rows =
tables.apply(
tableReadOperations.apply(

Check warning on line 458 in v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java#L458

Added line #L458 was not covered by tests
"Read all rows from Spanner",
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ public GenericRecord convert(Struct row) {
builder.set(field, nullValue ? null : row.getTimestamp(fieldIndex).toString());
} else if (spannerType.equals("DATE")) {
builder.set(field, nullValue ? null : dateToString(row.getDate(fieldIndex)));
} else if (spannerType.equals("UUID")) {
builder.set(field, nullValue ? null : row.getString(fieldIndex));
}
} else if (dialect == Dialect.POSTGRESQL) {
if (spannerType.equals("jsonb")) {
Expand All @@ -266,6 +268,8 @@ public GenericRecord convert(Struct row) {
builder.set(field, nullValue ? null : row.getTimestamp(fieldIndex).toString());
} else if (spannerType.equals("date")) {
builder.set(field, nullValue ? null : dateToString(row.getDate(fieldIndex)));
} else if (spannerType.equals("uuid")) {
builder.set(field, nullValue ? null : row.getString(fieldIndex));
}
}
break;
Expand Down Expand Up @@ -368,7 +372,9 @@ public GenericRecord convert(Struct row) {
case STRING:
{
if (dialect == Dialect.GOOGLE_STANDARD_SQL) {
if (fieldInfo.matchesArrayPattern() || spannerType.equals("ARRAY<JSON>")) {
if (fieldInfo.matchesArrayPattern()
|| spannerType.equals("ARRAY<JSON>")
|| spannerType.equals("ARRAY<UUID>")) {
builder.set(field, nullValue ? null : row.getStringList(fieldIndex));
} else if (spannerType.equals("ARRAY<TIMESTAMP>")) {
setTimestampArray(row, builder, field, fieldIndex, nullValue);
Expand All @@ -380,7 +386,8 @@ public GenericRecord convert(Struct row) {
if (spannerType.equals("jsonb[]")) {
builder.set(field, nullValue ? null : row.getPgJsonbList(fieldIndex));
} else if (fieldInfo.matchesVarcharArrayPattern()
|| spannerType.equals("text[]")) {
|| spannerType.equals("text[]")
|| spannerType.equals("uuid[]")) {
builder.set(field, nullValue ? null : row.getStringList(fieldIndex));
} else if (spannerType.equals("timestamp with time zone[]")) {
setTimestampArray(row, builder, field, fieldIndex, nullValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
*
* <p>Schema file must have all column and type definition in one line. Schema file must use the
* data type names of Cloud Spanner. We currently support the following Cloud Spanner data types: -
* BOOL - DATE - FLOAT32 - FLOAT64 - INT64 - STRING - TIMESTAMP
* BOOL - DATE - FLOAT32 - FLOAT64 - INT64 - STRING - TIMESTAMP - UUID
*
* <p>Input format properties: - \\N in the source column will be considered as NULL value when
* writing to Cloud Spanner. - If you need to escape characters, you can use the "fieldQualifier"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@
return Code.PROTO;
} else if (columnType.startsWith("ENUM") && dialect == Dialect.GOOGLE_STANDARD_SQL) {
return Code.ENUM;
} else if (columnType.equalsIgnoreCase("UUID") && dialect == Dialect.GOOGLE_STANDARD_SQL) {
return Code.UUID;

Check warning on line 485 in v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java#L485

Added line #L485 was not covered by tests
} else if (columnType.equalsIgnoreCase("bigint") && dialect == Dialect.POSTGRESQL) {
return Code.PG_INT8;
} else if (columnType.equalsIgnoreCase("real") && dialect == Dialect.POSTGRESQL) {
Expand Down Expand Up @@ -508,6 +510,8 @@
} else if (columnType.equalsIgnoreCase("spanner.commit_timestamp")
&& dialect == Dialect.POSTGRESQL) {
return Code.PG_SPANNER_COMMIT_TIMESTAMP;
} else if (columnType.equalsIgnoreCase("uuid") && dialect == Dialect.POSTGRESQL) {
return Code.PG_UUID;

Check warning on line 514 in v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java#L514

Added line #L514 was not covered by tests
} else {
throw new IllegalArgumentException(
"Unrecognized or unsupported column data type: " + columnType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public static String typeString(Type type, Integer size, boolean outputAsDdlRepr
return "FLOAT64";
case PG_FLOAT8:
return "double precision";
case UUID:
return "UUID";
case PG_UUID:
return "uuid";
case STRING:
return "STRING(" + (size == -1 ? "MAX" : Integer.toString(size)) + ")";
case PG_VARCHAR:
Expand Down Expand Up @@ -195,6 +199,9 @@ public static SizedType parseSpannerType(String spannerType, Dialect dialect) {
if (spannerType.equals("FLOAT64")) {
return t(Type.float64(), null);
}
if (spannerType.equals("UUID")) {
return t(Type.uuid(), null);
}
if (spannerType.startsWith("STRING")) {
String sizeStr = spannerType.substring(7, spannerType.length() - 1);
int size = sizeStr.equals("MAX") ? -1 : Integer.parseInt(sizeStr);
Expand Down Expand Up @@ -329,6 +336,9 @@ public static SizedType parseSpannerType(String spannerType, Dialect dialect) {
if (spannerType.equals("text")) {
return t(Type.pgText(), -1);
}
if (spannerType.equals("uuid")) {
return t(Type.pgUuid(), null);
}
if (spannerType.startsWith("character varying")) {
int size = -1;
if (spannerType.length() > 18) {
Expand Down
Loading
Loading