Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,30 @@
String getSchemaOverridesFilePath();

void setSchemaOverridesFilePath(String value);

@TemplateParameter.Text(
order = 33,
optional = true,
groupName = "Target",
description = "Cloud Spanner Shadow Table Instance Id.",
helpText =
"Optional separate instance for shadow tables. If not specified, shadow tables will be created in the main instance.")
@Default.String("")
String getShadowTableSpannerInstanceId();

void setShadowTableSpannerInstanceId(String value);

@TemplateParameter.Text(
order = 33,
optional = true,
groupName = "Target",
description = "Cloud Spanner Shadow Table Database Id.",
helpText =
"Optional separate database for shadow tables. If not specified, shadow tables will be created in the main database.")
@Default.String("")
String getShadowTableSpannerDatabaseId();

void setShadowTableSpannerDatabaseId(String value);
}

private static void validateSourceType(Options options) {
Expand Down Expand Up @@ -643,20 +667,31 @@
.setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setMaxAttempts(1)
.build());
SpannerConfig shadowTableSpannerConfig = getShadowTableSpannerConfig(options);

Check warning on line 670 in v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java#L670

Added line #L670 was not covered by tests
/* Process information schema
* 1) Read information schema from destination Cloud Spanner database
* 2) Check if shadow tables are present and create if necessary
* 3) Return new information schema
*/
PCollection<Ddl> ddl =
PCollectionTuple ddlTuple =

Check warning on line 676 in v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java#L676

Added line #L676 was not covered by tests
pipeline.apply(
"Process Information Schema",
new ProcessInformationSchema(
spannerConfig,
shadowTableSpannerConfig,
options.getShouldCreateShadowTables(),
options.getShadowTablePrefix(),
options.getDatastreamSourceType()));
PCollectionView<Ddl> ddlView = ddl.apply("Cloud Spanner DDL as view", View.asSingleton());
PCollectionView<Ddl> ddlView =

Check warning on line 685 in v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java#L685

Added line #L685 was not covered by tests
ddlTuple
.get(ProcessInformationSchema.MAIN_DDL_TAG)
.apply("Cloud Spanner Main DDL as view", View.asSingleton());

Check warning on line 688 in v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java#L687-L688

Added lines #L687 - L688 were not covered by tests

PCollectionView<Ddl> shadowTableDdlView =

Check warning on line 690 in v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java#L690

Added line #L690 was not covered by tests
ddlTuple
.get(ProcessInformationSchema.SHADOW_TABLE_DDL_TAG)
.apply("Cloud Spanner shadow tables DDL as view", View.asSingleton());

Check warning on line 693 in v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java#L692-L693

Added lines #L692 - L693 were not covered by tests

PCollection<FailsafeElement<String, String>> jsonRecords = null;
// Elements sent to the Dead Letter Queue are to be reconsumed.
// A DLQManager is to be created using PipelineOptions, and it is in charge
Expand Down Expand Up @@ -781,15 +816,16 @@
/*
* Stage 4: Write transformed records to Cloud Spanner
*/

SpannerTransactionWriter.Result spannerWriteResults =
transformedRecords
.get(DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG)
.apply(
"Write events to Cloud Spanner",
new SpannerTransactionWriter(
spannerConfig,
shadowTableSpannerConfig,
ddlView,
shadowTableDdlView,
options.getShadowTablePrefix(),
options.getDatastreamSourceType(),
isRegularMode));
Expand Down Expand Up @@ -842,6 +878,52 @@
return pipeline.run();
}

static SpannerConfig getShadowTableSpannerConfig(Options options) {
// Validate shadow table Spanner config - both instance and database must be specified together
String shadowTableSpannerInstanceId = options.getShadowTableSpannerInstanceId();
String shadowTableSpannerDatabaseId = options.getShadowTableSpannerDatabaseId();
LOG.info(
"Input Shadow table db - instance {} and database {}",
shadowTableSpannerInstanceId,
shadowTableSpannerDatabaseId);

if ((Strings.isNullOrEmpty(shadowTableSpannerInstanceId)
&& !Strings.isNullOrEmpty(shadowTableSpannerDatabaseId))
|| (!Strings.isNullOrEmpty(shadowTableSpannerInstanceId)
&& Strings.isNullOrEmpty(shadowTableSpannerDatabaseId))) {
throw new IllegalArgumentException(
"Both shadowTableSpannerInstanceId and shadowTableSpannerDatabaseId must be specified together");
}
// If not specified, use main instance and database values. The shadow table database stores the
// shadow tables and by default, is the same as he main database for backwards compatibility.
if (Strings.isNullOrEmpty(shadowTableSpannerInstanceId)
&& Strings.isNullOrEmpty(shadowTableSpannerDatabaseId)) {
shadowTableSpannerInstanceId = options.getInstanceId();
shadowTableSpannerDatabaseId = options.getDatabaseId();
LOG.info(
"Overwrote shadow table instance - {} and db- {}",
shadowTableSpannerInstanceId,
shadowTableSpannerDatabaseId);
}
return SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
.withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
.withInstanceId(ValueProvider.StaticValueProvider.of(shadowTableSpannerInstanceId))
.withDatabaseId(ValueProvider.StaticValueProvider.of(shadowTableSpannerDatabaseId))
.withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()))
.withCommitRetrySettings(
RetrySettings.newBuilder()
.setTotalTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setInitialRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
.setRetryDelayMultiplier(1)
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
.setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setMaxAttempts(1)
.build());
}

private static DeadLetterQueueManager buildDlqManager(Options options) {
String tempLocation =
options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,15 @@
/* The spanner config specifying the destination Cloud Spanner database to connect to */
private final SpannerConfig spannerConfig;

/* The spanner config specifying the shadow table Cloud Spanner database to connect to */
private final SpannerConfig shadowTableSpannerConfig;

/* The information schema of the Cloud Spanner database */
private final PCollectionView<Ddl> ddlView;

/* The information schema of the shadow table Cloud Spanner database */
private final PCollectionView<Ddl> shadowTableDdlView;

/* The prefix for shadow tables */
private final String shadowTablePrefix;

Expand All @@ -68,13 +74,17 @@

public SpannerTransactionWriter(
SpannerConfig spannerConfig,
SpannerConfig shadowTableSpannerConfig,
PCollectionView<Ddl> ddlView,
PCollectionView<Ddl> shadowTableDdlView,
String shadowTablePrefix,
String sourceType,
Boolean isRegularRunMode) {
Preconditions.checkNotNull(spannerConfig);
this.spannerConfig = spannerConfig;
this.shadowTableSpannerConfig = shadowTableSpannerConfig;

Check warning on line 85 in v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriter.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriter.java#L85

Added line #L85 was not covered by tests
this.ddlView = ddlView;
this.shadowTableDdlView = shadowTableDdlView;

Check warning on line 87 in v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriter.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriter.java#L87

Added line #L87 was not covered by tests
this.shadowTablePrefix = shadowTablePrefix;
this.sourceType = sourceType;
this.isRegularRunMode = isRegularRunMode;
Expand All @@ -88,8 +98,14 @@
"Write Mutations",
ParDo.of(
new SpannerTransactionWriterDoFn(
spannerConfig, ddlView, shadowTablePrefix, sourceType, isRegularRunMode))
.withSideInputs(ddlView)
spannerConfig,
shadowTableSpannerConfig,
ddlView,
shadowTableDdlView,
shadowTablePrefix,
sourceType,
isRegularRunMode))
.withSideInputs(ddlView, shadowTableDdlView)

Check warning on line 108 in v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriter.java

View check run for this annotation

Codecov / codecov/patch

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriter.java#L108

Added line #L108 was not covered by tests
.withOutputTags(
DatastreamToSpannerConstants.SUCCESSFUL_EVENT_TAG,
TupleTagList.of(
Expand Down
Loading
Loading