Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ public Sequence toSequence(String sequenceName, Schema schema) {
ImmutableList.Builder<String> sequenceOptions = ImmutableList.builder();
for (int i = 0; schema.getProp(SPANNER_SEQUENCE_OPTION + i) != null; i++) {
String prop = schema.getProp(SPANNER_SEQUENCE_OPTION + i);
if (prop.equals("sequence_kind=default")) {
if (prop.equals("sequence_kind=\"default\"")) {
// Specify no sequence kind by using the default_sequence_kind database option.
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ private void listColumns(Ddl.Builder builder) {
resultSet.isNull(12) ? null : Long.valueOf(resultSet.getString(12));
if (isIdentity) {
identityStartWithCounter =
updateCounterForIdentityColumn(
identityStartWithCounter, tableSchema + "." + columnName);
updateCounterForIdentityColumn(identityStartWithCounter, tableName + "." + columnName);
}
Long identitySkipRangeMin =
resultSet.isNull(13) ? null : Long.valueOf(resultSet.getString(13));
Expand Down Expand Up @@ -1661,6 +1660,7 @@ private void listSequenceOptionsGoogleSQL(
+ " ORDER BY t.name, t.option_name"));

Map<String, ImmutableList.Builder<String>> allOptions = Maps.newHashMap();
Set<String> hasSequenceKind = new HashSet<>();
while (resultSet.next()) {
String sequenceName = getQualifiedName(resultSet.getString(0), resultSet.getString(1));
String optionName = resultSet.getString(2);
Expand All @@ -1673,6 +1673,9 @@ private void listSequenceOptionsGoogleSQL(
// the DDL builder, instead of the one retrieved from Information Schema.
continue;
}
if (optionName.equals(Sequence.SEQUENCE_KIND)) {
hasSequenceKind.add(sequenceName);
}
ImmutableList.Builder<String> options =
allOptions.computeIfAbsent(sequenceName, k -> ImmutableList.builder());
if (optionType.equalsIgnoreCase("STRING")) {
Expand All @@ -1686,20 +1689,19 @@ private void listSequenceOptionsGoogleSQL(
options.add(optionName + "=" + optionValue);
}
}
// If the sequence kind is not specified, assign it to 'default'.
for (var entry : allOptions.entrySet()) {
if (!entry.getValue().toString().contains(Sequence.SEQUENCE_KIND)) {
entry
.getValue()
.add(
Sequence.SEQUENCE_KIND + "=" + GSQL_LITERAL_QUOTE + "default" + GSQL_LITERAL_QUOTE);
}
}

// Inject the current counter value to sequences that are in use.
for (Map.Entry<String, Long> entry : currentCounters.entrySet()) {
String sequenceName = entry.getKey();
ImmutableList.Builder<String> options =
allOptions.computeIfAbsent(entry.getKey(), k -> ImmutableList.builder());
allOptions.computeIfAbsent(sequenceName, k -> ImmutableList.builder());

if (!hasSequenceKind.contains(sequenceName)) {
// If the sequence kind is not specified, assign it to 'default'.
options.add(
Sequence.SEQUENCE_KIND + "=" + GSQL_LITERAL_QUOTE + "default" + GSQL_LITERAL_QUOTE);
}

// Add a buffer to accommodate writes that may happen after import
// is run. Note that this is not 100% failproof, since more writes may
// happen and they will make the sequence advances past the buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ public void sequences() {
+ " \"namespace\" : \"spannertest\","
+ " \"googleStorage\" : \"CloudSpanner\","
+ " \"googleFormatVersion\" : \"booleans\","
+ " \"sequenceOption_0\" : \"sequence_kind=default\""
+ " \"sequenceOption_0\" : \"sequence_kind=\\\"default\\\"\""
+ "}";
Collection<Schema> schemas = new ArrayList<>();
Schema.Parser parser = new Schema.Parser();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package com.google.cloud.teleport.spanner.ddl;

import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_COUNTER_START;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_KIND;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_SKIP_RANGE_MAX;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_SKIP_RANGE_MIN;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -31,6 +35,7 @@
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.teleport.spanner.DdlToAvroSchemaConverter;
import com.google.cloud.teleport.spanner.IntegrationTest;
import com.google.cloud.teleport.spanner.SpannerServerResource;
import com.google.cloud.teleport.spanner.common.Type;
Expand All @@ -39,8 +44,11 @@
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.Schema;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -1112,6 +1120,8 @@ public void pgChangeStreams() throws Exception {

@Test
public void sequences() throws Exception {
DdlToAvroSchemaConverter converter =
new DdlToAvroSchemaConverter("spannertest", "booleans", true);
List<String> statements =
Arrays.asList(
"ALTER DATABASE `"
Expand Down Expand Up @@ -1159,6 +1169,46 @@ public void sequences() throws Exception {
+ "\n\t`balanceId` INT64 NOT NULL,"
+ "\n) PRIMARY KEY (`id` ASC)\n\n";
assertThat(ddl.prettyPrint(), equalToCompressingWhiteSpace(expectedDdl));

Collection<Schema> result = converter.convert(ddl);
assertThat(result, hasSize(6));
Iterator<Schema> it = result.iterator();
Schema avroSchema1 = it.next();
assertThat(avroSchema1.getName(), equalTo("MySequence"));
assertThat(
avroSchema1.getProp("sequenceOption_0"),
equalTo("sequence_kind=\"bit_reversed_positive\""));
assertThat(avroSchema1.getProp("sequenceOption_1"), equalTo(null));

Schema avroSchema2 = it.next();
assertThat(avroSchema2.getName(), equalTo("MySequence2"));
assertThat(
avroSchema2.getProp("sequenceOption_0"),
equalTo("sequence_kind=\"bit_reversed_positive\""));
assertThat(avroSchema2.getProp("sequenceOption_1"), equalTo("skip_range_max=1000"));
assertThat(avroSchema2.getProp("sequenceOption_2"), equalTo("skip_range_min=1"));
assertThat(avroSchema2.getProp("sequenceOption_3"), equalTo("start_with_counter=100"));
assertThat(avroSchema2.getProp("sequenceOption_4"), equalTo(null));

Schema avroSchema3 = it.next();
assertThat(avroSchema3.getName(), equalTo("MySequence3"));
assertThat(avroSchema3.getProp("sequenceOption_0"), equalTo("skip_range_max=1000"));
assertThat(avroSchema3.getProp("sequenceOption_1"), equalTo("skip_range_min=1"));
assertThat(avroSchema3.getProp("sequenceOption_2"), equalTo("start_with_counter=100"));
assertThat(avroSchema3.getProp("sequenceOption_3"), equalTo("sequence_kind=\"default\""));
assertThat(avroSchema3.getProp("sequenceOption_4"), equalTo(null));

Schema avroSchema4 = it.next();
assertThat(avroSchema4.getName(), equalTo("MySequence4"));
assertThat(avroSchema4.getProp("sequenceOption_0"), equalTo("sequence_kind=\"default\""));
assertThat(avroSchema4.getProp("sequenceOption_1"), equalTo(null));

Schema avroSchema5 = it.next();
assertThat(avroSchema5.getProp(SPANNER_SEQUENCE_KIND), equalTo("bit_reversed_positive"));
assertThat(avroSchema5.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN), equalTo("1"));
assertThat(avroSchema5.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX), equalTo("1000"));
assertThat(avroSchema5.getProp(SPANNER_SEQUENCE_COUNTER_START), equalTo("100"));
assertThat(avroSchema5.getProp("sequenceOption_0"), equalTo(null));
}

@Test
Expand Down
Loading