Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2fe493c
Enhanced Retry Logic (#2196)
taherkl Feb 21, 2025
03323df
Adding support for Cassandra map (#2209)
VardhanThigle Feb 21, 2025
8c53147
changes (#2212)
asthamohta Feb 21, 2025
02f6b12
Fix inf issues in Datastream reader (#2213)
dhercher Feb 21, 2025
a558ed0
Upgrade Beam version to 2.63.0 (#2206)
jrmccluskey Feb 21, 2025
948f248
SkipShade for Spanner common module (#2194)
Abacn Feb 22, 2025
e01475b
Add load test for cross db txn (#2199)
Deep1998 Feb 24, 2025
a646e27
Report Lineage for CsvToBigQuery template (#2205)
Abacn Feb 26, 2025
46cab82
Spanner Import/Export INTERLEAVE IN (#2128)
jjfox15 Feb 26, 2025
0780ba7
Add SCRAM-SHA-512 authentication support to Kafka templates (#2181)
vgnanasekaran Feb 26, 2025
59afdb1
Update the required Java version in the base doc, then regenerate doc…
damccorm Feb 26, 2025
52e9478
Post 2.63.0 fixes (#2216)
Abacn Feb 26, 2025
d597f76
bug-fix: Use jdbc connection properties for reverse migration (#2198)
asthamohta Feb 27, 2025
f2a1556
Support partitioned reads for DateTime column type in JDBC to BigQuer…
sharan-malyala Feb 27, 2025
d4a709c
Disabling flaky test to unblock dataflow release (#2220)
shreyakhajanchi Feb 27, 2025
071aa6f
Add logic to skip runnerV2 for the ITs (#2219)
Rudra-Gujarathi Feb 27, 2025
049f42b
Add warning about caching with plugin (#2221)
derrickaw Feb 27, 2025
e0d274c
[DatastreamToSpanner] Spanner Exception handling (#2185)
darshan-sj Feb 28, 2025
bc90790
Custom transformation for Cassandra to Spanner SourceDB (#2201)
taherkl Mar 3, 2025
8d4c9f7
Fix excludedGroups input for maven-surefire plugin. (#2226)
Rudra-Gujarathi Mar 3, 2025
ec5d5d2
Updated the docs (#2225)
liferoad Mar 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions contributor-docs/code-contributions.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ After authenticated, install the plugin into your local repository:
mvn clean install -pl plugins/templates-maven-plugin -am
```

WARNING: After any changes to the plugin itself, those changes may be cached
and prevent any future changes from being observed. Please reissue:

```shell
mvn clean install -pl plugins/templates-maven-plugin -am
```

### Staging (Deploying) Templates

To stage a Template, it is necessary to upload the images to Artifact
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.teleport.metadata.DirectRunnerTest;
import com.google.cloud.teleport.metadata.MultiTemplateIntegrationTest;
import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.Template.TemplateType;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
Expand Down Expand Up @@ -134,6 +135,7 @@ protected void starting(Description description) {
protected GcsResourceManager artifactClient;

private boolean usingDirectRunner;
private boolean skipRunnerV2;
protected PipelineLauncher pipelineLauncher;
protected boolean skipBaseCleanup;

Expand All @@ -153,6 +155,7 @@ public void setUpBase() throws ExecutionException {
if (category != null) {
usingDirectRunner =
Arrays.asList(category.value()).contains(DirectRunnerTest.class) || usingDirectRunner;
skipRunnerV2 = Arrays.asList(category.value()).contains(SkipRunnerV2Test.class);
}
} catch (NoSuchMethodException e) {
// ignore error
Expand Down Expand Up @@ -492,9 +495,10 @@ protected LaunchInfo launchTemplate(
// Property allows testing with Runner v2 / Unified Worker
String unifiedWorkerHarnessContainerImage =
System.getProperty("unifiedWorkerHarnessContainerImage");
if (System.getProperty("unifiedWorker") != null || unifiedWorkerHarnessContainerImage != null) {
if (!skipRunnerV2
&& (System.getProperty("unifiedWorker") != null
|| unifiedWorkerHarnessContainerImage != null)) {
appendExperiment(options, "use_runner_v2");

if (System.getProperty("sdkContainerImage") != null) {
options.addParameter("sdkContainerImage", System.getProperty("sdkContainerImage"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
Expand All @@ -55,6 +56,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.common.utils.ExceptionUtils;
Expand Down Expand Up @@ -370,6 +372,64 @@ public synchronized void write(Iterable<Mutation> tableRecords) throws IllegalSt
}
}

/**
* Writes a collection of mutations into one or more tables inside a ReadWriteTransaction. This
* method requires {@link SpannerResourceManager#executeDdlStatement(String)} to be called
* beforehand.
*
* @param mutations A collection of mutation objects.
*/
public void writeInTransaction(Iterable<Mutation> mutations) {
checkIsUsable();
checkHasInstanceAndDatabase();

LOG.info("Sending {} mutations to {}.{}", Iterables.size(mutations), instanceId, databaseId);
DatabaseClient databaseClient =
spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId));
databaseClient
.readWriteTransaction()
.run(
(TransactionCallable<Void>)
transaction -> {
transaction.buffer(mutations);
return null;
});
LOG.info("Successfully sent mutations to {}.{}", instanceId, databaseId);
}

/**
* Executes a list of DML statements. This method requires {@link
* SpannerResourceManager#executeDdlStatement(String)} to be called beforehand.
*
* @param statements The DML statements.
* @throws IllegalStateException if method is called after resources have been cleaned up.
*/
public synchronized void executeDMLStatements(List<String> statements)
throws IllegalStateException {
checkIsUsable();
checkHasInstanceAndDatabase();

LOG.info("Executing DML statements on database {}.", statements, databaseId);
List<Statement> statementsList =
statements.stream().map(s -> Statement.of(s)).collect(Collectors.toList());
try {
DatabaseClient databaseClient =
spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId));
databaseClient
.readWriteTransaction()
.run(
(TransactionCallable<Void>)
transaction -> {
transaction.batchUpdate(statementsList);
return null;
});
LOG.debug(
"Successfully executed DML statements '{}' on database {}.", statements, databaseId);
} catch (Exception e) {
throw new SpannerResourceManagerException("Failed to execute statement.", e);
}
}

/**
* Runs the specified query.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Struct;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
Expand Down Expand Up @@ -104,6 +105,73 @@ public void testResourceManagerE2E() {
Map.of("RowId", 2, "FirstName", "Jane", "LastName", "Doe", "Company", "Alphabet")));
}

@Test
public void testResourceManagerWriteInTransactionAndExecuteDML() {
// Arrange
spannerResourceManager.executeDdlStatement(
"CREATE TABLE "
+ TABLE_ID
+ " ("
+ "RowId INT64 NOT NULL,"
+ "FirstName STRING(1024),"
+ "LastName STRING(1024),"
+ "Company STRING(1024)"
+ ") PRIMARY KEY (RowId)");

List<Mutation> mutations =
List.of(
Mutation.newInsertBuilder(TABLE_ID)
.set("RowId")
.to(1)
.set("FirstName")
.to("John")
.set("LastName")
.to("Doe")
.set("Company")
.to("Google")
.build(),
Mutation.newInsertBuilder(TABLE_ID)
.set("RowId")
.to(2)
.set("FirstName")
.to("Jane")
.set("LastName")
.to("Doe")
.set("Company")
.to("Alphabet")
.build());

List<String> statements =
Arrays.asList(
"INSERT INTO "
+ TABLE_ID
+ " (RowId, FirstName, LastName, Company) values (3, 'Tester', 'Doe', 'Youtube')",
"INSERT INTO "
+ TABLE_ID
+ " (RowId, FirstName, LastName, Company) values (4, 'Jacob', 'Doe', 'DeepMind')");

// Act
spannerResourceManager.writeInTransaction(mutations);
spannerResourceManager.executeDMLStatements(statements);
long rowCount = spannerResourceManager.getRowCount(TABLE_ID);

List<Struct> fetchRecords =
spannerResourceManager.readTableRecords(
TABLE_ID, List.of("RowId", "FirstName", "LastName", "Company"));

// Assert
assertThat(rowCount).isEqualTo(4);
assertThat(fetchRecords).hasSize(4);
assertThatStructs(fetchRecords)
.hasRecordsUnorderedCaseInsensitiveColumns(
List.of(
Map.of("RowId", 1, "FirstName", "John", "LastName", "Doe", "Company", "Google"),
Map.of("RowId", 2, "FirstName", "Jane", "LastName", "Doe", "Company", "Alphabet"),
Map.of("RowId", 3, "FirstName", "Tester", "LastName", "Doe", "Company", "Youtube"),
Map.of(
"RowId", 4, "FirstName", "Jacob", "LastName", "Doe", "Company", "DeepMind")));
}

@After
public void tearDown() {
ResourceManagerUtils.cleanResources(spannerResourceManager);
Expand Down
Loading
Loading