Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,15 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
String getNamespace();

void setNamespace(String value);

@TemplateParameter.Text(
order = 21,
optional = true,
description = "Use Inserts instead of Upserts for spanner mutations.",
helpText =
"By default the pipeline uses Upserts to write rows to spanner. Which means existing rows would get overwritten. If InsertOnly mode is enabled, inserts would be used instead of upserts and existing rows won't be overwritten.")
@Default.Boolean(false)
Boolean getInsertOnlyModeForSpannerMutations();

void setInsertOnlyModeForSpannerMutations(Boolean value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@

// Transform source data to Spanner Compatible Data
SourceRowToMutationDoFn transformDoFn =
SourceRowToMutationDoFn.create(schemaMapper, customTransformation);
SourceRowToMutationDoFn.create(
schemaMapper, customTransformation, options.getInsertOnlyModeForSpannerMutations());

Check warning on line 93 in v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/MigrateTableTransform.java

View check run for this annotation

Codecov / codecov/patch

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/MigrateTableTransform.java#L92-L93

Added lines #L92 - L93 were not covered by tests
PCollectionTuple transformationResult =
sourceRows.apply(
"Transform",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,20 @@ public void setSourceDbToSpannerTransformer(
@Nullable
public abstract CustomTransformation customTransformation();

public abstract boolean insertOnly();

/**
* Creates {@link SourceRowToMutationDoFn}.
*
* @param iSchemaMapper schema Mapper.
* @param customTransformation Custom transformation.
* @param insertOnly set true if you would like the mutations to use inserts and false for
* upserts.
* @return SourceRowToMutationDoFn.
*/
public static SourceRowToMutationDoFn create(
ISchemaMapper iSchemaMapper, CustomTransformation customTransformation) {
return new AutoValue_SourceRowToMutationDoFn(iSchemaMapper, customTransformation);
ISchemaMapper iSchemaMapper, CustomTransformation customTransformation, boolean insertOnly) {
return new AutoValue_SourceRowToMutationDoFn(iSchemaMapper, customTransformation, insertOnly);
}

/** Setup function to load custom transformation jars. */
Expand Down Expand Up @@ -101,7 +112,7 @@ public void processElement(ProcessContext c, MultiOutputReceiver output) {

String spannerTableName = iSchemaMapper().getSpannerTableName("", srcTableName);
// TODO: Move the mutation generation to writer. Create generic record here instead
Mutation mutation = mutationFromMap(spannerTableName, values);
Mutation mutation = mutationFromMap(spannerTableName, values, insertOnly());
output
.get(SourceDbToSpannerConstants.ROW_TRANSFORMATION_SUCCESS)
.output(RowContext.builder().setRow(sourceRow).setMutation(mutation).build());
Expand All @@ -114,8 +125,12 @@ public void processElement(ProcessContext c, MultiOutputReceiver output) {
}
}

private Mutation mutationFromMap(String spannerTableName, Map<String, Value> values) {
Mutation.WriteBuilder builder = Mutation.newInsertOrUpdateBuilder(spannerTableName);
private Mutation mutationFromMap(
String spannerTableName, Map<String, Value> values, boolean insertOnly) {
Mutation.WriteBuilder builder =
(insertOnly)
? Mutation.newInsertBuilder(spannerTableName)
: Mutation.newInsertOrUpdateBuilder(spannerTableName);
for (String spannerColName : values.keySet()) {
Value value = values.get(spannerColName);
if (value != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.teleport.v2.templates;

import static com.google.common.truth.Truth.assertThat;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;

import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
Expand All @@ -41,6 +42,7 @@
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

/**
* An integration test for {@link SourceDbToSpanner} Flex template which tests a basic migration on
Expand Down Expand Up @@ -123,5 +125,47 @@ public void simpleTest() throws IOException {
.hasRecordsUnorderedCaseInsensitiveColumns(mySQLData);
SpannerAsserts.assertThatStructs(spannerResourceManager.readTableRecords(TABLE2, ID, NAME))
.hasRecordsUnorderedCaseInsensitiveColumns(mySQLData);

/* Test insertOnly Mode */
mySQLResourceManager.runSQLUpdate("TRUNCATE TABLE " + TABLE2);
List<Map<String, Object>> updatedMySQLData = getMySQLData();
/* Every call gives a new random data which is central assumption here */
assertThat(updatedMySQLData).isNotEqualTo(mySQLData);
mySQLResourceManager.write(TABLE2, updatedMySQLData);

/* Check that the upserts have not happened and records still match the old data.*/
jobInfo =
launchDataflowJob(
getClass().getSimpleName(),
null,
null,
mySQLResourceManager,
spannerResourceManager,
ImmutableMap.of("insertOnlyModeForSpannerMutations", "true"),
null);
PipelineOperator.Result resultInsertsOnly =
pipelineOperator().waitUntilDone(createConfig(jobInfo));
assertThatResult(resultInsertsOnly).isLaunchFinished();
SpannerAsserts.assertThatStructs(spannerResourceManager.readTableRecords(TABLE1, ID, NAME))
.hasRecordsUnorderedCaseInsensitiveColumns(mySQLData);
SpannerAsserts.assertThatStructs(spannerResourceManager.readTableRecords(TABLE2, ID, NAME))
.hasRecordsUnorderedCaseInsensitiveColumns(mySQLData);

/* Again run in upsert mode and check that the records get updated */
jobInfo =
launchDataflowJob(
getClass().getSimpleName(),
null,
null,
mySQLResourceManager,
spannerResourceManager,
ImmutableMap.of("insertOnlyModeForSpannerMutations", "false"),
null);
PipelineOperator.Result resultUpserts = pipelineOperator().waitUntilDone(createConfig(jobInfo));
assertThatResult(resultUpserts).isLaunchFinished();
SpannerAsserts.assertThatStructs(spannerResourceManager.readTableRecords(TABLE1, ID, NAME))
.hasRecordsUnorderedCaseInsensitiveColumns(mySQLData);
SpannerAsserts.assertThatStructs(spannerResourceManager.readTableRecords(TABLE2, ID, NAME))
.hasRecordsUnorderedCaseInsensitiveColumns(updatedMySQLData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testSourceRowToMutationDoFn() {
.thenReturn(true);

PCollection<Mutation> mutations =
transform(sourceRows, SourceRowToMutationDoFn.create(mockIschemaMapper, null));
transform(sourceRows, SourceRowToMutationDoFn.create(mockIschemaMapper, null, false));

PAssert.that(mutations)
.containsInAnyOrder(
Expand All @@ -104,6 +104,51 @@ public void testSourceRowToMutationDoFn() {
pipeline.run();
}

@Test
public void testSourceRowToMutationDoFnWithInsertOnly() {
final String testTable = "srcTable";
var schemaRef = SchemaTestUtils.generateSchemaReference("public", "mydb");
var schema = SchemaTestUtils.generateTestTableSchema(testTable);
SourceRow sourceRow =
SourceRow.builder(schemaRef, schema, null, 12412435345L)
.setField("firstName", "abc")
.setField("lastName", "def")
.build();
PCollection<SourceRow> sourceRows = pipeline.apply(Create.of(sourceRow));
ISchemaMapper mockIschemaMapper =
mock(ISchemaMapper.class, Mockito.withSettings().serializable());
when(mockIschemaMapper.getDialect()).thenReturn(Dialect.GOOGLE_STANDARD_SQL);
when(mockIschemaMapper.getSpannerTableName(anyString(), anyString()))
.thenReturn("spannerTable");
when(mockIschemaMapper.getSpannerColumnName(anyString(), anyString(), eq("firstName")))
.thenReturn("spFirstName");
when(mockIschemaMapper.getSpannerColumnName(anyString(), anyString(), eq("lastName")))
.thenReturn("spLastName");
when(mockIschemaMapper.getSourceColumnName(anyString(), anyString(), eq("spFirstName")))
.thenReturn("firstName");
when(mockIschemaMapper.getSourceColumnName(anyString(), anyString(), eq("spLastName")))
.thenReturn("lastName");
when(mockIschemaMapper.getSpannerColumnType(anyString(), anyString(), anyString()))
.thenReturn(Type.string());
when(mockIschemaMapper.getSpannerColumns(anyString(), anyString()))
.thenReturn(List.of("spFirstName", "spLastName"));
when(mockIschemaMapper.colExistsAtSource(anyString(), anyString(), anyString()))
.thenReturn(true);

PCollection<Mutation> mutations =
transform(sourceRows, SourceRowToMutationDoFn.create(mockIschemaMapper, null, true));

PAssert.that(mutations)
.containsInAnyOrder(
Mutation.newInsertBuilder("spannerTable")
.set("spFirstName")
.to("abc")
.set("spLastName")
.to("def")
.build());
pipeline.run();
}

@Test
public void testCustomTranformation() throws InvalidTransformationException {
final String testTable = "srcTable";
Expand Down Expand Up @@ -165,7 +210,7 @@ public void testCustomTranformation() throws InvalidTransformationException {
.thenReturn(mockOutputReceiverForTag); // Return the mock OutputReceiver

SourceRowToMutationDoFn sourceRowToMutationDoFn =
SourceRowToMutationDoFn.create(mockIschemaMapper, null);
SourceRowToMutationDoFn.create(mockIschemaMapper, null, false);
sourceRowToMutationDoFn.setSourceDbToSpannerTransformer(spannerMigrationTransformer);
sourceRowToMutationDoFn.processElement(processContextMock, outputReceiverMock);

Expand Down Expand Up @@ -245,7 +290,7 @@ public void testCustomTransformationFilteredEvents() throws InvalidTransformatio
.thenReturn(mockOutputReceiverForTag); // Return the mock OutputReceiver

SourceRowToMutationDoFn sourceRowToMutationDoFn =
SourceRowToMutationDoFn.create(mockIschemaMapper, null);
SourceRowToMutationDoFn.create(mockIschemaMapper, null, false);
sourceRowToMutationDoFn.setSourceDbToSpannerTransformer(spannerMigrationTransformer);
sourceRowToMutationDoFn.processElement(processContextMock, outputReceiverMock);

Expand All @@ -266,7 +311,7 @@ public void testSourceRowToMutationDoFn_invalidTableUUID() {
ISchemaMapper mockIschemaMapper =
mock(ISchemaMapper.class, Mockito.withSettings().serializable());
PCollection<Mutation> mutations =
transform(sourceRows, SourceRowToMutationDoFn.create(mockIschemaMapper, null));
transform(sourceRows, SourceRowToMutationDoFn.create(mockIschemaMapper, null, false));

PAssert.that(mutations).empty();
pipeline.run();
Expand All @@ -289,7 +334,7 @@ public void testSourceRowToMutationDoFn_transformException() {
.thenThrow(NoSuchElementException.class);

PCollection<Mutation> mutations =
transform(sourceRows, SourceRowToMutationDoFn.create(mockIschemaMapper, null));
transform(sourceRows, SourceRowToMutationDoFn.create(mockIschemaMapper, null, false));

PAssert.that(mutations).empty();
pipeline.run();
Expand Down
Loading