Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ public Sequence toSequence(String sequenceName, Schema schema) {
ImmutableList.Builder<String> sequenceOptions = ImmutableList.builder();
for (int i = 0; schema.getProp(SPANNER_SEQUENCE_OPTION + i) != null; i++) {
String prop = schema.getProp(SPANNER_SEQUENCE_OPTION + i);
if (prop.equals("sequence_kind=default")) {
if (prop.equals("sequence_kind=\"default\"")) {
// Specify no sequence kind by using the default_sequence_kind database option.
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ private Schema avroType(
case PG_BYTEA:
case PROTO:
case TOKENLIST:
case PG_SPANNER_TOKENLIST:
return SchemaBuilder.builder().bytesType();
case TIMESTAMP:
case PG_TIMESTAMPTZ:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public static String typeString(Type type, Integer size, boolean outputAsDdlRepr
return "JSON";
case PG_JSONB:
return "jsonb";
case PG_SPANNER_TOKENLIST:
return "spanner.tokenlist";
case PROTO:
if (outputAsDdlRepresentation) {
String quote = NameUtils.identifierQuote(Dialect.GOOGLE_STANDARD_SQL);
Expand Down Expand Up @@ -353,6 +355,9 @@ public static SizedType parseSpannerType(String spannerType, Dialect dialect) {
if (spannerType.equals("spanner.commit_timestamp")) {
return t(Type.pgSpannerCommitTimestamp(), null);
}
if (spannerType.equals("spanner.tokenlist")) {
return t(Type.pgSpannerTokenlist(), null);
}
break;
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public final class Type implements Serializable {
new Type(Code.PG_ARRAY, TYPE_PG_TIMESTAMPTZ, null);
private static final Type TYPE_PG_ARRAY_DATE = new Type(Code.PG_ARRAY, TYPE_PG_DATE, null);

private static final Type TYPE_PG_SPANNER_TOKENLIST =
new Type(Code.PG_SPANNER_TOKENLIST, null, null);
private static final int AMBIGUOUS_FIELD = -1;
private static final long serialVersionUID = -3076152125004114582L;

Expand Down Expand Up @@ -209,6 +211,10 @@ public static Type pgSpannerCommitTimestamp() {
return TYPE_PG_SPANNER_COMMIT_TIMESTAMP;
}

public static Type pgSpannerTokenlist() {
return TYPE_PG_SPANNER_TOKENLIST;
}

/** Returns a descriptor for an array of {@code elementType}. */
public static Type array(Type elementType) {
Preconditions.checkNotNull(elementType);
Expand Down Expand Up @@ -353,6 +359,7 @@ public enum Code {
PG_TIMESTAMPTZ("timestamp with time zone", Dialect.POSTGRESQL),
PG_DATE("date", Dialect.POSTGRESQL),
PG_SPANNER_COMMIT_TIMESTAMP("spanner.commit_timestamp", Dialect.POSTGRESQL),
PG_SPANNER_TOKENLIST("spanner.tokenlist", Dialect.POSTGRESQL),
PG_ARRAY("array", Dialect.POSTGRESQL);

private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ public void prettyPrint(Appendable appendable) throws IOException {
appendable.append(" PLACEMENT KEY");
}
if (isHidden()) {
if (dialect() == Dialect.GOOGLE_STANDARD_SQL) {
appendable.append(" HIDDEN");
}
appendable.append(" HIDDEN");
}
if (columnOptions() == null) {
return;
Expand Down Expand Up @@ -347,6 +345,10 @@ public Builder pgJsonb() {
return type(Type.pgJsonb());
}

public Builder pgSpannerTokenlist() {
return type(Type.pgSpannerTokenlist());
}

public Builder proto(String protoTypeFqn) {
return type(Type.proto(protoTypeFqn));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,18 +354,20 @@ private void listColumns(Ddl.Builder builder) {
resultSet.isNull(12) ? null : Long.valueOf(resultSet.getString(12));
if (isIdentity) {
identityStartWithCounter =
updateCounterForIdentityColumn(
identityStartWithCounter, tableSchema + "." + columnName);
updateCounterForIdentityColumn(identityStartWithCounter, tableName + "." + columnName);
}
Long identitySkipRangeMin =
resultSet.isNull(13) ? null : Long.valueOf(resultSet.getString(13));
Long identitySkipRangeMax =
resultSet.isNull(14) ? null : Long.valueOf(resultSet.getString(14));
boolean isHidden = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getBoolean(15) : false;
boolean isHidden =
dialect == Dialect.GOOGLE_STANDARD_SQL
? resultSet.getBoolean(15)
: resultSet.getString(15).equalsIgnoreCase("YES");
boolean isPlacementKey =
dialect == Dialect.GOOGLE_STANDARD_SQL
? resultSet.getBoolean(16)
: resultSet.getBoolean(15);
: resultSet.getBoolean(16);

builder
.createTable(tableName)
Expand Down Expand Up @@ -419,8 +421,8 @@ Statement listColumnsSQL() {
"SELECT c.table_schema, c.table_name, c.column_name,"
+ " c.ordinal_position, c.spanner_type, c.is_nullable,"
+ " c.is_generated, c.generation_expression, c.is_stored, c.column_default,"
+ " c.is_identity, c.identity_kind, c.identity_start_with_counter,"
+ " c.identity_skip_range_min, c.identity_skip_range_max,"
+ " c.is_identity, c.identity_kind, c.identity_start_with_counter, "
+ " c.identity_skip_range_min, c.identity_skip_range_max, c.is_hidden,"
+ " pkc.constraint_name IS NOT NULL AS is_placement_key"
+ " FROM information_schema.columns as c"
+ " LEFT JOIN placementkeycolumns AS pkc"
Expand Down Expand Up @@ -1661,6 +1663,7 @@ private void listSequenceOptionsGoogleSQL(
+ " ORDER BY t.name, t.option_name"));

Map<String, ImmutableList.Builder<String>> allOptions = Maps.newHashMap();
Set<String> hasSequenceKind = new HashSet<>();
while (resultSet.next()) {
String sequenceName = getQualifiedName(resultSet.getString(0), resultSet.getString(1));
String optionName = resultSet.getString(2);
Expand All @@ -1673,6 +1676,9 @@ private void listSequenceOptionsGoogleSQL(
// the DDL builder, instead of the one retrieved from Information Schema.
continue;
}
if (optionName.equals(Sequence.SEQUENCE_KIND)) {
hasSequenceKind.add(sequenceName);
}
ImmutableList.Builder<String> options =
allOptions.computeIfAbsent(sequenceName, k -> ImmutableList.builder());
if (optionType.equalsIgnoreCase("STRING")) {
Expand All @@ -1686,20 +1692,19 @@ private void listSequenceOptionsGoogleSQL(
options.add(optionName + "=" + optionValue);
}
}
// If the sequence kind is not specified, assign it to 'default'.
for (var entry : allOptions.entrySet()) {
if (!entry.getValue().toString().contains(Sequence.SEQUENCE_KIND)) {
entry
.getValue()
.add(
Sequence.SEQUENCE_KIND + "=" + GSQL_LITERAL_QUOTE + "default" + GSQL_LITERAL_QUOTE);
}
}

// Inject the current counter value to sequences that are in use.
for (Map.Entry<String, Long> entry : currentCounters.entrySet()) {
String sequenceName = entry.getKey();
ImmutableList.Builder<String> options =
allOptions.computeIfAbsent(entry.getKey(), k -> ImmutableList.builder());
allOptions.computeIfAbsent(sequenceName, k -> ImmutableList.builder());

if (!hasSequenceKind.contains(sequenceName)) {
// If the sequence kind is not specified, assign it to 'default'.
options.add(
Sequence.SEQUENCE_KIND + "=" + GSQL_LITERAL_QUOTE + "default" + GSQL_LITERAL_QUOTE);
}

// Add a buffer to accommodate writes that may happen after import
// is run. Note that this is not 100% failproof, since more writes may
// happen and they will make the sequence advances past the buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ private static Type parseSpannerType(String spannerType, Dialect dialect) {
if ("SPANNER.COMMIT_TIMESTAMP".equals(spannerType)) {
return Type.timestamp();
}
if ("SPANNER.TOKENLIST".equals(spannerType)) {
return Type.bytes();
}
throw new IllegalArgumentException("Unknown spanner type " + spannerType);
default:
throw new IllegalArgumentException("Unrecognized dialect: " + dialect.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,14 @@ public void pgSimple() {
+ " \"type\" : [ \"null\", \"string\" ],"
+ " \"sqlType\" : \"spanner.commit_timestamp\""
+ " }, {"
+ " \"name\" : \"tokens\", "
+ " \"type\" : [\"null\"], "
+ " \"sqlType\" : \"spanner.tokenlist\","
+ " \"notNull\" : \"false\","
+ " \"generationExpression\" : \"spanner.tokenize_fulltext(first_name)\","
+ " \"stored\": \"true\","
+ " \"hidden\": \"true\""
+ " }, {"
+ " \"name\" : \"date\","
+ " \"type\" : [ \"null\", \"string\" ],"
+ " \"sqlType\" : \"date\""
Expand Down Expand Up @@ -477,6 +485,7 @@ public void pgSimple() {
+ " \"text\" text,"
+ " \"timestamptz\" timestamp with time zone,"
+ " \"commit_time\" spanner.commit_timestamp,"
+ " \"tokens\" spanner.tokenlist GENERATED ALWAYS AS (spanner.tokenize_fulltext(first_name)) STORED HIDDEN,"
+ " \"date\" date,"
+ " \"varcharArr1\" character varying[],"
+ " \"varcharArr2\" character varying[],"
Expand Down Expand Up @@ -988,7 +997,7 @@ public void sequences() {
+ " \"namespace\" : \"spannertest\","
+ " \"googleStorage\" : \"CloudSpanner\","
+ " \"googleFormatVersion\" : \"booleans\","
+ " \"sequenceOption_0\" : \"sequence_kind=default\""
+ " \"sequenceOption_0\" : \"sequence_kind=\\\"default\\\"\""
+ "}";
Collection<Schema> schemas = new ArrayList<>();
Schema.Parser parser = new Schema.Parser();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ public void pgSimple() {
.skipRangeMin(2000L)
.skipRangeMax(3000L)
.endColumn()
.column("tokens")
.pgSpannerTokenlist()
.generatedAs("(spanner.tokenize_fulltext(full_name))")
.isHidden(true)
.endColumn()
.primaryKey()
.asc("id")
.asc("gen_id")
Expand Down Expand Up @@ -407,7 +412,7 @@ public void pgSimple() {

List<Schema.Field> fields = avroSchema.getFields();

assertThat(fields, hasSize(7));
assertThat(fields, hasSize(8));

assertThat(fields.get(0).name(), equalTo("id"));
// Not null
Expand Down Expand Up @@ -472,6 +477,15 @@ public void pgSimple() {
assertThat(fields.get(6).getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN), equalTo("2000"));
assertThat(fields.get(6).getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX), equalTo("3000"));

assertThat(fields.get(7).name(), equalTo("tokens"));
assertThat(fields.get(7).schema(), equalTo(Schema.create(Schema.Type.NULL)));
assertThat(fields.get(7).getProp(SQL_TYPE), equalTo("spanner.tokenlist"));
assertThat(fields.get(7).getProp(NOT_NULL), equalTo("false"));
assertThat(
fields.get(7).getProp(GENERATION_EXPRESSION),
equalTo("(spanner.tokenize_fulltext(full_name))"));
assertThat(fields.get(7).getProp(HIDDEN), equalTo("true"));
assertThat(fields.get(7).getProp(DEFAULT_EXPRESSION), equalTo(null));
// spanner pk
assertThat(avroSchema.getProp(SPANNER_PRIMARY_KEY + "_0"), equalTo("\"id\" ASC"));
assertThat(avroSchema.getProp(SPANNER_PRIMARY_KEY + "_1"), equalTo("\"gen_id\" ASC"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ private void testPostgresSpannerToGCSAvroBase(
+ " \"FirstName\" character varying(256),\n"
+ " \"LastName\" character varying(256),\n"
+ " \"Rating\" real,\n"
+ " \"NameTokens\" spanner.tokenlist generated always as (spanner.tokenize_fulltext(\"FirstName\")) stored hidden,\n"
+ "PRIMARY KEY(\"Id\"))",
testName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ private void testPostgresImportPipelineBase(
+ " \"Id\" bigint,\n"
+ " \"FirstName\" character varying(256),\n"
+ " \"LastName\" character varying(256),\n"
+ " \"NameTokens\" spanner.tokenlist generated always as (spanner.tokenize_fulltext(\"FirstName\")) stored hidden,\n"
+ "PRIMARY KEY(\"Id\"))";
spannerResourceManager.executeDdlStatement(createSingersTableStatement);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package com.google.cloud.teleport.spanner.ddl;

import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_COUNTER_START;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_KIND;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_SKIP_RANGE_MAX;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_SKIP_RANGE_MIN;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -31,6 +35,7 @@
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.teleport.spanner.DdlToAvroSchemaConverter;
import com.google.cloud.teleport.spanner.IntegrationTest;
import com.google.cloud.teleport.spanner.SpannerServerResource;
import com.google.cloud.teleport.spanner.common.Type;
Expand All @@ -39,8 +44,11 @@
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.Schema;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -1112,6 +1120,8 @@ public void pgChangeStreams() throws Exception {

@Test
public void sequences() throws Exception {
DdlToAvroSchemaConverter converter =
new DdlToAvroSchemaConverter("spannertest", "booleans", true);
List<String> statements =
Arrays.asList(
"ALTER DATABASE `"
Expand Down Expand Up @@ -1159,6 +1169,46 @@ public void sequences() throws Exception {
+ "\n\t`balanceId` INT64 NOT NULL,"
+ "\n) PRIMARY KEY (`id` ASC)\n\n";
assertThat(ddl.prettyPrint(), equalToCompressingWhiteSpace(expectedDdl));

Collection<Schema> result = converter.convert(ddl);
assertThat(result, hasSize(6));
Iterator<Schema> it = result.iterator();
Schema avroSchema1 = it.next();
assertThat(avroSchema1.getName(), equalTo("MySequence"));
assertThat(
avroSchema1.getProp("sequenceOption_0"),
equalTo("sequence_kind=\"bit_reversed_positive\""));
assertThat(avroSchema1.getProp("sequenceOption_1"), equalTo(null));

Schema avroSchema2 = it.next();
assertThat(avroSchema2.getName(), equalTo("MySequence2"));
assertThat(
avroSchema2.getProp("sequenceOption_0"),
equalTo("sequence_kind=\"bit_reversed_positive\""));
assertThat(avroSchema2.getProp("sequenceOption_1"), equalTo("skip_range_max=1000"));
assertThat(avroSchema2.getProp("sequenceOption_2"), equalTo("skip_range_min=1"));
assertThat(avroSchema2.getProp("sequenceOption_3"), equalTo("start_with_counter=100"));
assertThat(avroSchema2.getProp("sequenceOption_4"), equalTo(null));

Schema avroSchema3 = it.next();
assertThat(avroSchema3.getName(), equalTo("MySequence3"));
assertThat(avroSchema3.getProp("sequenceOption_0"), equalTo("skip_range_max=1000"));
assertThat(avroSchema3.getProp("sequenceOption_1"), equalTo("skip_range_min=1"));
assertThat(avroSchema3.getProp("sequenceOption_2"), equalTo("start_with_counter=100"));
assertThat(avroSchema3.getProp("sequenceOption_3"), equalTo("sequence_kind=\"default\""));
assertThat(avroSchema3.getProp("sequenceOption_4"), equalTo(null));

Schema avroSchema4 = it.next();
assertThat(avroSchema4.getName(), equalTo("MySequence4"));
assertThat(avroSchema4.getProp("sequenceOption_0"), equalTo("sequence_kind=\"default\""));
assertThat(avroSchema4.getProp("sequenceOption_1"), equalTo(null));

Schema avroSchema5 = it.next();
assertThat(avroSchema5.getProp(SPANNER_SEQUENCE_KIND), equalTo("bit_reversed_positive"));
assertThat(avroSchema5.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN), equalTo("1"));
assertThat(avroSchema5.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX), equalTo("1000"));
assertThat(avroSchema5.getProp(SPANNER_SEQUENCE_COUNTER_START), equalTo("100"));
assertThat(avroSchema5.getProp("sequenceOption_0"), equalTo(null));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testListColumnsSQL() {
+ " SELECT c.table_schema, c.table_name, c.column_name, c.ordinal_position, c.spanner_type, c.is_nullable,"
+ " c.is_generated, c.generation_expression, c.is_stored, c.column_default,"
+ " c.is_identity, c.identity_kind, c.identity_start_with_counter,"
+ " c.identity_skip_range_min, c.identity_skip_range_max,"
+ " c.identity_skip_range_min, c.identity_skip_range_max, c.is_hidden,"
+ " pkc.constraint_name IS NOT NULL AS is_placement_key"
+ " FROM information_schema.columns as c"
+ " LEFT JOIN placementkeycolumns AS pkc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ public void testSinglePgTable() throws Exception {
.addColumn("test", "jsonbCol", "jsonb")
.addColumn("test", "arrayCol", "DOUBLE PRECISION[]")
.addColumn("test", "embeddingVectorCol", "DOUBLE PRECISION[] VECTOR LENGTH 16")
.addColumn("test", "tokens", "spanner.tokenlist")
.build();

assertEquals(1, schema.getTables().size());
assertEquals(7, schema.getColumns("test").size());
assertEquals(8, schema.getColumns("test").size());
assertEquals(1, schema.getKeyParts("test").size());
assertEquals(Type.timestamp(), schema.getColumns("test").get(3).getType());
assertEquals(Type.array(Type.float64()), schema.getColumns("test").get(5).getType());
Expand Down
Loading
Loading