Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.cloud.teleport.spanner.AvroUtil.IDENTITY_COLUMN;
import static com.google.cloud.teleport.spanner.AvroUtil.INPUT;
import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL;
import static com.google.cloud.teleport.spanner.AvroUtil.ON_UPDATE_EXPRESSION;
import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_CHANGE_STREAM_FOR_CLAUSE;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_CHECK_CONSTRAINT;
Expand Down Expand Up @@ -632,13 +633,18 @@ public Table toTable(String tableName, Schema schema) {
if (nullable) {
avroType = unpacked;
}
} else {
String notNull = f.getProp(NOT_NULL);
nullable = notNull != null && !Boolean.parseBoolean(notNull);
}
if (Strings.isNullOrEmpty(sqlType)) {
Type spannerType = inferType(avroType, true);
sqlType = SizedType.typeString(spannerType, -1, true);
}
String defaultExpression = f.getProp(DEFAULT_EXPRESSION);
column.parseType(sqlType).notNull(!nullable).defaultExpression(defaultExpression);
String onUpdateExpression = f.getProp(ON_UPDATE_EXPRESSION);
column.parseType(sqlType).notNull(!nullable).onUpdateExpression(onUpdateExpression);
}
String hidden = f.getProp(HIDDEN);
if (Boolean.parseBoolean(hidden)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ private AvroUtil() {}

// The property names in Avro schema.
public static final String DEFAULT_EXPRESSION = "defaultExpression";
public static final String ON_UPDATE_EXPRESSION = "onUpdateExpression";
public static final String GENERATION_EXPRESSION = "generationExpression";
public static final String GOOGLE_FORMAT_VERSION = "googleFormatVersion";
public static final String GOOGLE_STORAGE = "googleStorage";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.google.cloud.teleport.spanner.AvroUtil.IDENTITY_COLUMN;
import static com.google.cloud.teleport.spanner.AvroUtil.INPUT;
import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL;
import static com.google.cloud.teleport.spanner.AvroUtil.ON_UPDATE_EXPRESSION;
import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_CHANGE_STREAM_FOR_CLAUSE;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_CHECK_CONSTRAINT;
Expand Down Expand Up @@ -232,6 +233,9 @@ public Collection<Schema> convert(Ddl ddl) {
}
} else if (cm.defaultExpression() != null) {
fieldBuilder.prop(DEFAULT_EXPRESSION, cm.defaultExpression());
if (cm.onUpdateExpression() != null) {
fieldBuilder.prop(ON_UPDATE_EXPRESSION, cm.onUpdateExpression());
}
}
Schema avroType = avroType(cm.type(), table.name() + "_" + columnOrdinal++);
if (!cm.notNull()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public abstract class Column implements Serializable {
@Nullable
public abstract String defaultExpression();

@Nullable
public abstract String onUpdateExpression();

public static Builder builder(Dialect dialect) {
return new AutoValue_Column.Builder()
.dialect(dialect)
Expand Down Expand Up @@ -113,6 +116,14 @@ public void prettyPrint(Appendable appendable) throws IOException {
appendable.append(" (").append(defaultExpression()).append(")");
}
}
if (onUpdateExpression() != null) {
appendable.append(" ON UPDATE ");
if (dialect() == Dialect.POSTGRESQL) {
appendable.append(onUpdateExpression());
} else {
appendable.append(" (").append(onUpdateExpression()).append(")");
}
}
if (isIdentityColumn()) {
appendable.append(" GENERATED BY DEFAULT AS IDENTITY");
List<String> options = new ArrayList<>(3);
Expand Down Expand Up @@ -227,6 +238,8 @@ public Builder notNull() {

public abstract Builder defaultExpression(String expression);

public abstract Builder onUpdateExpression(String expression);

public Builder generatedAs(String expression) {
return isGenerated(true).generationExpression(expression);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,12 @@ private void listColumns(Ddl.Builder builder) {
resultSet.isNull(13) ? null : Long.valueOf(resultSet.getString(13));
Long identitySkipRangeMax =
resultSet.isNull(14) ? null : Long.valueOf(resultSet.getString(14));
String onUpdateExpression = resultSet.isNull(15) ? null : resultSet.getString(15);
boolean isHidden =
dialect == Dialect.GOOGLE_STANDARD_SQL
? resultSet.getBoolean(15)
: resultSet.getString(15).equalsIgnoreCase("YES");
boolean isPlacementKey = resultSet.getBoolean(16);
? resultSet.getBoolean(16)
: resultSet.getString(16).equalsIgnoreCase("YES");
boolean isPlacementKey = resultSet.getBoolean(17);

builder
.createTable(tableName)
Expand All @@ -407,6 +408,7 @@ private void listColumns(Ddl.Builder builder) {
.generationExpression(generationExpression)
.isStored(isStored)
.defaultExpression(defaultExpression)
.onUpdateExpression(onUpdateExpression)
.isIdentityColumn(isIdentity)
.sequenceKind(sequenceKind)
.counterStartValue(identityStartWithCounter)
Expand Down Expand Up @@ -434,7 +436,8 @@ Statement listColumnsSQL() {
+ " c.ordinal_position, c.spanner_type, c.is_nullable,"
+ " c.is_generated, c.generation_expression, c.is_stored,"
+ " c.column_default, c.is_identity, c.identity_kind, c.identity_start_with_counter,"
+ " c.identity_skip_range_min, c.identity_skip_range_max, c.is_hidden,"
+ " c.identity_skip_range_min, c.identity_skip_range_max,"
+ " c.on_update_expression, c.is_hidden,"
+ " pkc.constraint_name IS NOT NULL AS is_placement_key"
+ " FROM information_schema.columns as c"
+ " LEFT JOIN placementkeycolumns AS pkc"
Expand All @@ -450,7 +453,8 @@ Statement listColumnsSQL() {
+ " c.ordinal_position, c.spanner_type, c.is_nullable,"
+ " c.is_generated, c.generation_expression, c.is_stored, c.column_default,"
+ " c.is_identity, c.identity_kind, c.identity_start_with_counter, "
+ " c.identity_skip_range_min, c.identity_skip_range_max, c.is_hidden,"
+ " c.identity_skip_range_min, c.identity_skip_range_max,"
+ " c.on_update_expression, c.is_hidden,"
+ " pkc.constraint_name IS NOT NULL AS is_placement_key"
+ " FROM information_schema.columns as c"
+ " LEFT JOIN placementkeycolumns AS pkc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,21 @@ public void simple() {
+ " \"skipRangeMax\" : \"3000\","
+ " \"counterStartValue\" : \"1000\""
+ " }, {"
+ " \"name\" : \"default_commit_ts\","
+ " \"type\" : \"null\","
+ " \"sqlType\" : \"TIMESTAMP\","
+ " \"notNull\" : \"false\","
+ " \"defaultExpression\" : \"PENDING_COMMIT_TIMESTAMP()\","
+ " \"spannerOption_0\" : \"allow_commit_timestamp=true\""
+ " }, {"
+ " \"name\" : \"on_update_ts\","
+ " \"type\" : \"null\","
+ " \"sqlType\" : \"TIMESTAMP\","
+ " \"notNull\" : \"false\","
+ " \"defaultExpression\" : \"PENDING_COMMIT_TIMESTAMP()\","
+ " \"onUpdateExpression\" : \"PENDING_COMMIT_TIMESTAMP()\","
+ " \"spannerOption_0\" : \"allow_commit_timestamp=true\""
+ " }, {"
+ " \"name\" : \"numeric\","
+ " \"type\" : [\"null\", {\"type\":\"bytes\",\"logicalType\":\"decimal\"}],"
+ " \"sqlType\" : \"NUMERIC\""
Expand Down Expand Up @@ -243,7 +258,6 @@ public void simple() {
+ "}";

Schema schema = new Schema.Parser().parse(avroString);

AvroSchemaToDdlConverter converter = new AvroSchemaToDdlConverter();
Ddl ddl = converter.toDdl(Collections.singleton(schema));
assertThat(ddl.allTables(), hasSize(1));
Expand All @@ -261,6 +275,11 @@ public void simple() {
+ "BIT_REVERSED_POSITIVE SKIP RANGE 2000, 3000 START COUNTER WITH 1000),"
+ " `identity_column_no_kind` INT64 GENERATED BY DEFAULT AS IDENTITY ("
+ "SKIP RANGE 2000, 3000 START COUNTER WITH 1000),"
+ " `default_commit_ts` TIMESTAMP DEFAULT (PENDING_COMMIT_TIMESTAMP())"
+ " OPTIONS (allow_commit_timestamp=true),"
+ " `on_update_ts` TIMESTAMP DEFAULT (PENDING_COMMIT_TIMESTAMP())"
+ " ON UPDATE (PENDING_COMMIT_TIMESTAMP())"
+ " OPTIONS (allow_commit_timestamp=true),"
+ " `numeric` NUMERIC,"
+ " `numeric2` NUMERIC,"
+ " `notNumeric` BYTES(MAX),"
Expand Down Expand Up @@ -359,6 +378,19 @@ public void pgSimple() {
+ " \"skipRangeMax\" : \"3000\","
+ " \"counterStartValue\" : \"1000\""
+ " }, {"
+ " \"name\" : \"default_commit_ts\","
+ " \"type\" : [ \"null\", \"string\" ],"
+ " \"sqlType\" : \"spanner.commit_timestamp\","
+ " \"notNull\" : \"false\","
+ " \"defaultExpression\" : \"SPANNER.PENDING_COMMIT_TIMESTAMP()\""
+ " }, {"
+ " \"name\" : \"on_update_ts\","
+ " \"type\" : [ \"null\", \"string\" ],"
+ " \"sqlType\" : \"spanner.commit_timestamp\","
+ " \"notNull\" : \"false\","
+ " \"defaultExpression\" : \"SPANNER.PENDING_COMMIT_TIMESTAMP()\","
+ " \"onUpdateExpression\" : \"SPANNER.PENDING_COMMIT_TIMESTAMP()\""
+ " }, {"
+ " \"name\" : \"numeric\","
+ " \"type\" : [\"null\", {\"type\":\"bytes\",\"logicalType\":\"decimal\"}],"
+ " \"sqlType\" : \"numeric\""
Expand Down Expand Up @@ -499,6 +531,11 @@ public void pgSimple() {
+ "BIT_REVERSED_POSITIVE SKIP RANGE 2000 3000 START COUNTER WITH 1000),"
+ " \"identity_column_no_kind\" bigint GENERATED BY DEFAULT AS IDENTITY ("
+ "SKIP RANGE 2000 3000 START COUNTER WITH 1000),"
+ " \"default_commit_ts\" spanner.commit_timestamp "
+ " DEFAULT SPANNER.PENDING_COMMIT_TIMESTAMP(),"
+ " \"on_update_ts\" spanner.commit_timestamp "
+ " DEFAULT SPANNER.PENDING_COMMIT_TIMESTAMP()"
+ " ON UPDATE SPANNER.PENDING_COMMIT_TIMESTAMP(),"
+ " \"numeric\" numeric,"
+ " \"numeric2\" numeric,"
+ " \"notNumeric\" bytea,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,76 @@ public void pgIdentityColumn() throws Exception {
runTest(Dialect.POSTGRESQL);
}

@Test
public void commitTimestampColumns() throws Exception {
// spotless:off
Ddl.Builder ddlBuilder = Ddl.builder();
List<Export.DatabaseOption> dbOptionList = new ArrayList<>();
dbOptionList.add(
Export.DatabaseOption.newBuilder()
.setOptionName("default_sequence_kind")
.setOptionValue("\"bit_reversed_positive\"")
.build());
ddlBuilder.mergeDatabaseOptions(dbOptionList);
Ddl ddl = ddlBuilder
.createTable("CommitTimestampTable")
.column("id")
.int64()
.endColumn()
.column("default_commit_ts")
.type(Type.timestamp())
.defaultExpression("PENDING_COMMIT_TIMESTAMP()")
.columnOptions(ImmutableList.of("allow_commit_timestamp=TRUE"))
.endColumn()
.column("on_update_ts")
.type(Type.timestamp())
.defaultExpression("PENDING_COMMIT_TIMESTAMP()")
.onUpdateExpression("PENDING_COMMIT_TIMESTAMP()")
.columnOptions(ImmutableList.of("allow_commit_timestamp=TRUE"))
.endColumn()
.primaryKey().asc("id").end()
.endTable()
.build();
// spotless:on

createAndPopulate(ddl, 10);
runTest();
}

@Test
public void pgCommitTimestampColumns() throws Exception {
// spotless:off
Ddl.Builder ddlBuilder = Ddl.builder(Dialect.POSTGRESQL);
List<Export.DatabaseOption> dbOptionList = new ArrayList<>();
dbOptionList.add(
Export.DatabaseOption.newBuilder()
.setOptionName("default_sequence_kind")
.setOptionValue("\"bit_reversed_positive\"")
.build());
ddlBuilder.mergeDatabaseOptions(dbOptionList);
Ddl ddl = ddlBuilder
.createTable("CommitTimestampTable")
.column("id")
.int64()
.endColumn()
.column("default_commit_ts")
.pgSpannerCommitTimestamp()
.defaultExpression("spanner.pending_commit_timestamp()")
.endColumn()
.column("on_update_ts")
.pgSpannerCommitTimestamp()
.defaultExpression("spanner.pending_commit_timestamp()")
.onUpdateExpression("spanner.pending_commit_timestamp()")
.endColumn()
.primaryKey().asc("id").end()
.endTable()
.build();
// spotless:on

createAndPopulate(ddl, 10);
runTest();
}

@Test
public void udfs() throws Exception {
Ddl.Builder ddlBuilder = Ddl.builder();
Expand Down
Loading
Loading