Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/setup-java-env/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ runs:
echo "YESTERDAY=$KEY" >> $GITHUB_ENV
fi
- name: Setup Cache
uses: actions/cache@ab5e6d0c87105b4c9c2047343972218f562e4319 # v4.0.1
uses: actions/cache@v4
id: setup-cache
with:
path: |
Expand Down
5 changes: 5 additions & 0 deletions v1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,11 @@
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Multi-Release>true</Multi-Release>
</manifestEntries>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,16 @@ public Collection<Schema> convert(Ddl ddl) {
if (cm.sequenceKind() != null) {
fieldBuilder.prop(SPANNER_SEQUENCE_KIND, cm.sequenceKind());
}
fieldBuilder.prop(
SPANNER_SEQUENCE_COUNTER_START, String.valueOf(cm.counterStartValue()));
fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MIN, String.valueOf(cm.skipRangeMin()));
fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MAX, String.valueOf(cm.skipRangeMax()));
if (cm.counterStartValue() != null) {
fieldBuilder.prop(
SPANNER_SEQUENCE_COUNTER_START, String.valueOf(cm.counterStartValue()));
}
if (cm.skipRangeMin() != null) {
fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MIN, String.valueOf(cm.skipRangeMin()));
}
if (cm.skipRangeMax() != null) {
fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MAX, String.valueOf(cm.skipRangeMax()));
}
} else if (cm.defaultExpression() != null) {
fieldBuilder.prop(DEFAULT_EXPRESSION, cm.defaultExpression());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ private void listColumns(Ddl.Builder builder) {
String defaultExpression = resultSet.isNull(9) ? null : resultSet.getString(9);
boolean isIdentity = resultSet.getString(10).equalsIgnoreCase("YES");
String identityKind = resultSet.isNull(11) ? null : resultSet.getString(11);
String sequenceKind = null;
if (identityKind != null && identityKind.equals("BIT_REVERSED_POSITIVE_SEQUENCE")) {
sequenceKind = "bit_reversed_positive";
}
// The start_with_counter value is the initial value and cannot represent the actual state of
// the counter. We need to apply the current counter to the DDL builder, instead of the one
// retrieved from Information Schema.
Expand Down Expand Up @@ -375,7 +379,7 @@ private void listColumns(Ddl.Builder builder) {
.isStored(isStored)
.defaultExpression(defaultExpression)
.isIdentityColumn(isIdentity)
.sequenceKind(identityKind)
.sequenceKind(sequenceKind)
.counterStartValue(identityStartWithCounter)
.skipRangeMin(identitySkipRangeMin)
.skipRangeMax(identitySkipRangeMax)
Expand Down Expand Up @@ -1626,15 +1630,19 @@ private void listSequences(Ddl.Builder builder, Map<String, Long> currentCounter
ResultSet resultSet = context.executeQuery(queryStatement);
while (resultSet.next()) {
String sequenceName = getQualifiedName(resultSet.getString(0), resultSet.getString(1));
builder.createSequence(sequenceName).endSequence();

Statement sequenceCounterStatement;
switch (dialect) {
case GOOGLE_STANDARD_SQL:
ImmutableList.Builder<String> options = ImmutableList.builder();
options.add(
Sequence.SEQUENCE_KIND + "=" + GSQL_LITERAL_QUOTE + "default" + GSQL_LITERAL_QUOTE);
builder.createSequence(sequenceName).options(options.build()).endSequence();
sequenceCounterStatement =
Statement.of("SELECT GET_INTERNAL_SEQUENCE_STATE(SEQUENCE " + sequenceName + ")");
break;
case POSTGRESQL:
builder.createSequence(sequenceName).endSequence();
sequenceCounterStatement =
Statement.of(
"SELECT spanner.GET_INTERNAL_SEQUENCE_STATE('"
Expand Down Expand Up @@ -1666,7 +1674,6 @@ private void listSequenceOptionsGoogleSQL(
+ " ORDER BY t.name, t.option_name"));

Map<String, ImmutableList.Builder<String>> allOptions = Maps.newHashMap();
Set<String> hasSequenceKind = new HashSet<>();
while (resultSet.next()) {
String sequenceName = getQualifiedName(resultSet.getString(0), resultSet.getString(1));
String optionName = resultSet.getString(2);
Expand All @@ -1679,9 +1686,6 @@ private void listSequenceOptionsGoogleSQL(
// the DDL builder, instead of the one retrieved from Information Schema.
continue;
}
if (optionName.equals(Sequence.SEQUENCE_KIND)) {
hasSequenceKind.add(sequenceName);
}
ImmutableList.Builder<String> options =
allOptions.computeIfAbsent(sequenceName, k -> ImmutableList.builder());
if (optionType.equalsIgnoreCase("STRING")) {
Expand All @@ -1702,12 +1706,6 @@ private void listSequenceOptionsGoogleSQL(
ImmutableList.Builder<String> options =
allOptions.computeIfAbsent(sequenceName, k -> ImmutableList.builder());

if (!hasSequenceKind.contains(sequenceName)) {
// If the sequence kind is not specified, assign it to 'default'.
options.add(
Sequence.SEQUENCE_KIND + "=" + GSQL_LITERAL_QUOTE + "default" + GSQL_LITERAL_QUOTE);
}

// Add a buffer to accommodate writes that may happen after import
// is run. Note that this is not 100% failproof, since more writes may
// happen and they will make the sequence advances past the buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ public void simple() {
.column("uuid_column")
.type(Type.uuid())
.endColumn()
.column("identity_column_no_params")
.type(Type.int64())
.isIdentityColumn(true)
.endColumn()
.primaryKey()
.asc("id")
.asc("gen_id")
Expand Down Expand Up @@ -202,7 +206,7 @@ public void simple() {

List<Schema.Field> fields = avroSchema.getFields();

assertThat(fields, hasSize(11));
assertThat(fields, hasSize(12));

assertThat(fields.get(0).name(), equalTo("id"));
// Not null
Expand Down Expand Up @@ -300,6 +304,16 @@ public void simple() {
assertThat(field10.getProp(STORED), equalTo(null));
assertThat(field10.getProp(DEFAULT_EXPRESSION), equalTo(null));

assertThat(fields.get(11).name(), equalTo("identity_column_no_params"));
assertThat(fields.get(11).schema(), equalTo(nullableUnion(Schema.Type.LONG)));
assertThat(fields.get(11).getProp(SQL_TYPE), equalTo("INT64"));
assertThat(fields.get(11).getProp(NOT_NULL), equalTo(null));
assertThat(fields.get(11).getProp(IDENTITY_COLUMN), equalTo("true"));
assertThat(fields.get(11).getProp(SPANNER_SEQUENCE_KIND), equalTo(null));
assertThat(fields.get(11).getProp(SPANNER_SEQUENCE_COUNTER_START), equalTo(null));
assertThat(fields.get(11).getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN), equalTo(null));
assertThat(fields.get(11).getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX), equalTo(null));

// spanner pk
assertThat(avroSchema.getProp(SPANNER_PRIMARY_KEY + "_0"), equalTo("`id` ASC"));
assertThat(avroSchema.getProp(SPANNER_PRIMARY_KEY + "_1"), equalTo("`gen_id` ASC"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ public class ExportPipelineIT extends SpannerTemplateITBase {
+ " ]\n"
+ "}");

private static final Schema IDENTITY_SCHEMA =
new Schema.Parser()
.parse(
"{\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"Identity\",\n"
+ " \"namespace\": \"com.google.cloud.teleport.spanner\",\n"
+ " \"fields\": [\n"
+ " { \"name\": \"Id\", \"type\": \"long\", \"sqlType\": \"INT64\", \"sequenceKind\":\"bit_reversed_positive\", \"identityColumn\":\"true\" },\n"
+ " { \"name\": \"NonKeyIdCol1\", \"type\": \"long\", \"sqlType\": \"INT64\", \"identityColumn\":\"true\" },\n"
+ " { \"name\": \"NonKeyIdCol2\", \"type\": \"long\", \"sqlType\": \"INT64\", \"skipRangeMin\":\"1000\",\"skipRangeMax\":\"2000\", \"identityColumn\":\"true\" }\n"
+ " ]\n"
+ "}");

private static final Schema MODEL_STRUCT_SCHEMA =
new Schema.Parser()
.parse(
Expand Down Expand Up @@ -189,6 +203,24 @@ private void testSpannerToGCSAvroBase(
String.format(
"CREATE TABLE `%s_EmptyTable` (\n" + " id INT64 NOT NULL,\n" + ") PRIMARY KEY(id)",
testName);
String setDefaultSequenceKindStatement =
"ALTER DATABASE db SET OPTIONS (\n"
+ " default_sequence_kind = 'bit_reversed_positive')";
String createIdentityTableStatement =
String.format(
"CREATE TABLE `%s_Identity` (\n"
+ " Id INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE),\n"
+ " NonKeyIdCol1 INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY,\n"
+ " NonKeyIdCol2 INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (SKIP RANGE 1000, 2000),\n"
+ ") PRIMARY KEY(Id)",
testName);

String createSequenceStatement =
String.format(
"CREATE SEQUENCE `%s_Sequence1` BIT_REVERSED_POSITIVE SKIP RANGE 99, 999", testName);
String createSequenceNoSpecifiedKindStatement =
String.format("CREATE SEQUENCE `%s_Sequence2`", testName);

String createSingersTableStatement =
String.format(
"CREATE TABLE `%s_Singers` (\n"
Expand All @@ -215,9 +247,13 @@ private void testSpannerToGCSAvroBase(
testName, testName);

spannerResourceManager.executeDdlStatement(createEmptyTableStatement);
spannerResourceManager.executeDdlStatement(setDefaultSequenceKindStatement);
spannerResourceManager.executeDdlStatement(createIdentityTableStatement);
spannerResourceManager.executeDdlStatement(createSingersTableStatement);
spannerResourceManager.executeDdlStatement(createModelStructStatement);
spannerResourceManager.executeDdlStatement(createSearchIndexStatement);
spannerResourceManager.executeDdlStatement(createSequenceStatement);
spannerResourceManager.executeDdlStatement(createSequenceNoSpecifiedKindStatement);
List<Mutation> expectedData = generateTableRows(String.format("%s_Singers", testName));
spannerResourceManager.write(expectedData);
PipelineLauncher.LaunchConfig.Builder options =
Expand Down Expand Up @@ -250,19 +286,35 @@ private void testSpannerToGCSAvroBase(
gcsClient.listArtifacts(
"output/",
Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "SearchIndex")));
List<Artifact> identityArtifacts =
gcsClient.listArtifacts(
"output/", Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Identity")));
List<Artifact> sequenceArtifacts =
gcsClient.listArtifacts(
"output/",
Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Sequence1")));
List<Artifact> sequenceNoKindArtifacts =
gcsClient.listArtifacts(
"output/",
Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Sequence2")));
assertThat(singersArtifacts).isNotEmpty();
assertThat(emptyArtifacts).isNotEmpty();
assertThat(modelStructArtifacts).isNotEmpty();
assertThat(identityArtifacts).isNotEmpty();
assertThat(sequenceArtifacts).isNotEmpty();
assertThat(sequenceNoKindArtifacts).isNotEmpty();

List<GenericRecord> singersRecords = extractArtifacts(singersArtifacts, SINGERS_SCHEMA);
List<GenericRecord> emptyRecords = extractArtifacts(emptyArtifacts, EMPTY_SCHEMA);
List<GenericRecord> modelStructRecords =
extractArtifacts(modelStructArtifacts, MODEL_STRUCT_SCHEMA);
List<GenericRecord> identityRecords = extractArtifacts(identityArtifacts, IDENTITY_SCHEMA);

assertThatGenericRecords(singersRecords)
.hasRecordsUnorderedCaseInsensitiveColumns(mutationsToRecords(expectedData));
assertThatGenericRecords(emptyRecords).hasRows(0);
assertThatGenericRecords(modelStructRecords).hasRows(0);
assertThatGenericRecords(identityRecords).hasRows(0);
}

@Test
Expand All @@ -286,6 +338,22 @@ private void testPostgresSpannerToGCSAvroBase(
String.format(
"CREATE TABLE \"%s_EmptyTable\" (\n" + " id bigint NOT NULL,\nPRIMARY KEY(id)\n" + ")",
testName);
String setDefaultSequenceKindStatement =
"ALTER DATABASE db SET spanner.default_sequence_kind = 'bit_reversed_positive'";
String createIdentityTableStatement =
String.format(
"CREATE TABLE \"%s_Identity\" (\n"
+ " Id bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE) PRIMARY KEY,\n"
+ " NonKeyIdCol1 bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY,\n"
+ " NonKeyIdCol2 bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY (SKIP RANGE 1000 2000)\n"
+ ")",
testName);

String createSequenceStatement =
String.format(
"CREATE SEQUENCE \"%s_Sequence1\" BIT_REVERSED_POSITIVE SKIP RANGE 99 999", testName);
String createSequenceNoSpecifiedKindStatement =
String.format("CREATE SEQUENCE \"%s_Sequence2\"", testName);
String createSingersTableStatement =
String.format(
"CREATE TABLE \"%s_Singers\" (\n"
Expand All @@ -306,6 +374,10 @@ private void testPostgresSpannerToGCSAvroBase(
spannerResourceManager.executeDdlStatement(createEmptyTableStatement);
spannerResourceManager.executeDdlStatement(createSingersTableStatement);
spannerResourceManager.executeDdlStatement(createSearchIndexStatement);
spannerResourceManager.executeDdlStatement(setDefaultSequenceKindStatement);
spannerResourceManager.executeDdlStatement(createIdentityTableStatement);
spannerResourceManager.executeDdlStatement(createSequenceStatement);
spannerResourceManager.executeDdlStatement(createSequenceNoSpecifiedKindStatement);
List<Mutation> expectedData = generateTableRows(String.format("%s_Singers", testName));
spannerResourceManager.write(expectedData);
PipelineLauncher.LaunchConfig.Builder options =
Expand Down Expand Up @@ -334,15 +406,32 @@ private void testPostgresSpannerToGCSAvroBase(
gcsClient.listArtifacts(
"output/",
Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "SearchIndex")));
List<Artifact> identityArtifacts =
gcsClient.listArtifacts(
"output/", Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Identity")));
List<Artifact> sequenceArtifacts =
gcsClient.listArtifacts(
"output/",
Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Sequence1")));
List<Artifact> sequenceNoKindArtifacts =
gcsClient.listArtifacts(
"output/",
Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Sequence2")));
assertThat(singersArtifacts).isNotEmpty();
assertThat(emptyArtifacts).isNotEmpty();
assertThat(identityArtifacts).isNotEmpty();
assertThat(sequenceArtifacts).isNotEmpty();
assertThat(sequenceNoKindArtifacts).isNotEmpty();

List<GenericRecord> singersRecords = extractArtifacts(singersArtifacts, SINGERS_SCHEMA);
List<GenericRecord> emptyRecords = extractArtifacts(emptyArtifacts, EMPTY_SCHEMA);

assertThatGenericRecords(singersRecords)
.hasRecordsUnorderedCaseInsensitiveColumns(mutationsToRecords(expectedData));
assertThatGenericRecords(emptyRecords).hasRows(0);

List<GenericRecord> identityRecords = extractArtifacts(identityArtifacts, IDENTITY_SCHEMA);
assertThatGenericRecords(identityRecords).hasRows(0);
}

// TODO(b/395532087): Consolidate this with other tests after UUID launch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,27 @@ private void uploadImportPipelineArtifacts(String subdirectory) throws IOExcepti
"input/UuidTable-manifest.json",
Resources.getResource("ImportPipelineIT/" + subdirectory + "/UuidTable-manifest.json")
.getPath());
gcsClient.uploadArtifact(
"input/Identity.avro-00000-of-00001",
Resources.getResource("ImportPipelineIT/" + subdirectory + "/Identity.avro").getPath());
gcsClient.uploadArtifact(
"input/Identity-manifest.json",
Resources.getResource("ImportPipelineIT/" + subdirectory + "/Identity-manifest.json")
.getPath());
gcsClient.uploadArtifact(
"input/Sequence1.avro-00000-of-00001",
Resources.getResource("ImportPipelineIT/" + subdirectory + "/Sequence1.avro").getPath());
gcsClient.uploadArtifact(
"input/Sequence1-manifest.json",
Resources.getResource("ImportPipelineIT/" + subdirectory + "/Sequence1-manifest.json")
.getPath());
gcsClient.uploadArtifact(
"input/Sequence2.avro-00000-of-00001",
Resources.getResource("ImportPipelineIT/" + subdirectory + "/Sequence2.avro").getPath());
gcsClient.uploadArtifact(
"input/Sequence2-manifest.json",
Resources.getResource("ImportPipelineIT/" + subdirectory + "/Sequence2-manifest.json")
.getPath());

if (Objects.equals(subdirectory, "googlesql")) {
gcsClient.uploadArtifact(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"files": [{
"name": "Identity.avro-00000-of-00001",
"md5": "kACNneTTdQ8Zo9fw710G5w\u003d\u003d"
}]
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"files": [{
"name": "Sequence1.avro-00000-of-00001",
"md5": "Sp/gFsJR5whR3dnd/iCJxw\u003d\u003d"
}]
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"files": [{
"name": "Sequence2.avro-00000-of-00001",
"md5": "ossgsYlkqF2RuX1ahbfTrw\u003d\u003d"
}]
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,23 @@
}, {
"name": "Float32Table",
"manifestFile": "Float32Table-manifest.json"
}, {
"name": "Identity",
"manifestFile": "Identity-manifest.json"
}, {
"name": "ModelStruct",
"manifestFile": "ModelStruct-manifest.json"
}],
"databaseOptions": [{
"optionName": "default_sequence_kind",
"optionType": "STRING",
"optionValue": "bit_reversed_positive"
}],
"sequences": [{
"name": "Sequence1",
"manifestFile": "Sequence1-manifest.json"
}, {
"name": "Sequence2",
"manifestFile": "Sequence2-manifest.json"
}]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"files": [{
"name": "Identity.avro-00000-of-00001",
"md5": "UYaA6WoJBsd78Koo28H/Yg\u003d\u003d"
}]
}
Binary file not shown.
Loading
Loading