Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
15b621d
Merge remote-tracking branch 'upstream/main'
Nov 27, 2024
d314c70
Merge pull request #9 from GoogleCloudPlatform/main
taherkl Dec 9, 2024
71a6477
Merge pull request #20 from GoogleCloudPlatform/main
taherkl Dec 16, 2024
7512825
Merge pull request #24 from GoogleCloudPlatform/main
taherkl Dec 17, 2024
5aa21dc
Merge pull request #26 from GoogleCloudPlatform/main
taherkl Dec 18, 2024
04bac39
Merge pull request #28 from GoogleCloudPlatform/main
taherkl Dec 20, 2024
f618128
Merge pull request #29 from GoogleCloudPlatform/main
akashthawaitcc Dec 23, 2024
e53e8e4
Merge pull request #32 from GoogleCloudPlatform/main
akashthawaitcc Dec 26, 2024
407a593
Merge pull request #36 from GoogleCloudPlatform/main
taherkl Dec 30, 2024
0d140e6
Merge pull request #40 from GoogleCloudPlatform/main
taherkl Jan 2, 2025
ae3037a
Merge pull request #46 from GoogleCloudPlatform/main
taherkl Jan 7, 2025
ef5ae8d
Merge pull request #48 from GoogleCloudPlatform/main
taherkl Jan 7, 2025
c6442ba
Merge pull request #52 from GoogleCloudPlatform/main
taherkl Jan 8, 2025
7c02a96
Merge pull request #55 from GoogleCloudPlatform/main
taherkl Jan 15, 2025
e3f4369
Merge pull request #79 from GoogleCloudPlatform/main
taherkl Jan 23, 2025
5d5f1a8
Merge pull request #83 from GoogleCloudPlatform/main
taherkl Jan 28, 2025
f1a4d1e
Merge pull request #84 from GoogleCloudPlatform/main
taherkl Jan 28, 2025
22e677f
Merge pull request #87 from GoogleCloudPlatform/main
taherkl Jan 30, 2025
ee9c99a
sync upstream/main (#98)
akashthawaitcc Feb 12, 2025
186075f
Merge remote-tracking branch 'upstream/main'
Feb 17, 2025
c4fff0f
Merge remote-tracking branch 'upstream/main'
Feb 17, 2025
dc5e904
Merge pull request #107 from GoogleCloudPlatform/main
taherkl Feb 18, 2025
6a55499
Merge pull request #116 from GoogleCloudPlatform/main
taherkl Feb 21, 2025
8504aeb
Merge pull request #120 from GoogleCloudPlatform/main
taherkl Feb 21, 2025
15b906b
Enhanced Retry Logic (#2196)
taherkl Feb 21, 2025
421f5f3
Adding support for Cassandra map (#2209)
VardhanThigle Feb 21, 2025
afff9c2
changes (#2212)
asthamohta Feb 21, 2025
06310d9
Fix inf issues in Datastream reader (#2213)
dhercher Feb 21, 2025
a28df77
Upgrade Beam version to 2.63.0 (#2206)
jrmccluskey Feb 21, 2025
9d5a0d2
SkipShade for Spanner common module (#2194)
Abacn Feb 22, 2025
c713ae9
Add load test for cross db txn (#2199)
Deep1998 Feb 24, 2025
b0fec28
Report Lineage for CsvToBigQuery template (#2205)
Abacn Feb 26, 2025
9316565
Spanner Import/Export INTERLEAVE IN (#2128)
jjfox15 Feb 26, 2025
673f380
Add SCRAM-SHA-512 authentication support to Kafka templates (#2181)
vgnanasekaran Feb 26, 2025
38e1604
Update the required Java version in the base doc, then regenerate doc…
damccorm Feb 26, 2025
04d9281
Post 2.63.0 fixes (#2216)
Abacn Feb 26, 2025
406b59e
bug-fix: Use jdbc connection properties for reverse migration (#2198)
asthamohta Feb 27, 2025
db5f80d
Support partitioned reads for DateTime column type in JDBC to BigQuer…
sharan-malyala Feb 27, 2025
3b3d3cc
Disabling flaky test to unblock dataflow release (#2220)
shreyakhajanchi Feb 27, 2025
586231a
Add logic to skip runnerV2 for the ITs (#2219)
Rudra-Gujarathi Feb 27, 2025
f2f2030
Add warning about caching with plugin (#2221)
derrickaw Feb 27, 2025
34e4732
[DatastreamToSpanner] Spanner Exception handling (#2185)
darshan-sj Feb 28, 2025
7d9dfcd
Merge pull request #127 from GoogleCloudPlatform/main
taherkl Mar 4, 2025
9914b25
Remove Python version from `pom.xml` (#2234)
svetakvsundhar Mar 4, 2025
7f009ba
Add SkipRunnerV2Test category to JmsToPubsubIT and PubSubCdcToBigQuer…
Rudra-Gujarathi Mar 5, 2025
1f51939
Fix a bug in CSVToBigQuery where commas in fields are not handled cor…
shunping Mar 5, 2025
d591c2c
Update Dockerfile-template-yaml (#2222)
svetakvsundhar Mar 5, 2025
7227a05
Adding All Datatypes IT for Cassandra Migration (#2230)
VardhanThigle Mar 6, 2025
3008319
Add IF NOT EXISTS clause for spanner ddls used in ITs (#2237)
Deep1998 Mar 7, 2025
1f258df
Using set of random buckets for spanner ITs (#2223)
shreyakhajanchi Mar 7, 2025
3685987
Add promote artifact method in release plugin (#2227)
Abacn Mar 10, 2025
4589cc7
fixed default DLQ path (#2241)
dedocibula Mar 10, 2025
f80c7cb
Fix stagingArtifactRegistry support raw us.gcr.io artifact registry (…
Abacn Mar 10, 2025
d574016
Print error response on wget call (#2245)
Abacn Mar 11, 2025
d0248c3
Moving local spanner io to a different namespace (#2231)
darshan-sj Mar 11, 2025
1acbfb9
Adding Cassandra Type Options to IT test (#2242)
VardhanThigle Mar 12, 2025
5871d01
Bump timeouts for tests involving FKs/interleaved dependenceis (#2239)
Deep1998 Mar 12, 2025
9460304
Load Tests - Cassandra Reverse Replication (#2163)
taherkl Mar 16, 2025
6cb7a1d
Cassandra wide row it (#140)
pawankashyapollion Mar 17, 2025
ab95277
Merge branch 'main' into wide_row_rr_handling
akashthawaitcc Mar 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE;
import static com.google.common.truth.Truth.assertThat;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
import static org.junit.Assert.assertEquals;

import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.google.cloud.spanner.Mutation;
import com.google.pubsub.v1.SubscriptionName;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.beam.it.cassandra.CassandraResourceManager;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpannerToCassandraSourceDbMaxColumnsIT extends SpannerToSourceDbITBase {

private static final Logger LOG =
LoggerFactory.getLogger(SpannerToCassandraSourceDbMaxColumnsIT.class);

private static final String SPANNER_DDL_RESOURCE =
"SpannerToSourceDbWideRowIT/spanner-max-col-schema.sql";
private static final String CASSANDRA_SCHEMA_FILE_RESOURCE =
"SpannerToSourceDbWideRowIT/cassandra-max-col-schema.sql";
private static final String CASSANDRA_CONFIG_FILE_RESOURCE =
"SpannerToSourceDbWideRowIT/cassandra-config-template.conf";

private static final String TEST_TABLE = "TestTable";
private static final HashSet<SpannerToCassandraSourceDbMaxColumnsIT> testInstances =
new HashSet<>();
private static PipelineLauncher.LaunchInfo jobInfo;
public static SpannerResourceManager spannerResourceManager;
private static SpannerResourceManager spannerMetadataResourceManager;
public static CassandraResourceManager cassandraResourceManager;
private static GcsResourceManager gcsResourceManager;
private static PubsubResourceManager pubsubResourceManager;
private SubscriptionName subscriptionName;
private final List<Throwable> assertionErrors = new ArrayList<>();

@Before
public void setUp() throws IOException {
skipBaseCleanup = true;
synchronized (SpannerToCassandraSourceDbMaxColumnsIT.class) {
testInstances.add(this);
if (jobInfo == null) {
spannerResourceManager = createSpannerDatabase(SPANNER_DDL_RESOURCE);
spannerMetadataResourceManager = createSpannerMetadataDatabase();

cassandraResourceManager = generateKeyspaceAndBuildCassandraResource();
gcsResourceManager =
GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials)
.build();
createAndUploadCassandraConfigToGcs(
gcsResourceManager, cassandraResourceManager, CASSANDRA_CONFIG_FILE_RESOURCE);
createCassandraSchema(cassandraResourceManager, CASSANDRA_SCHEMA_FILE_RESOURCE);
pubsubResourceManager = setUpPubSubResourceManager();
subscriptionName =
createPubsubResources(
getClass().getSimpleName(),
pubsubResourceManager,
getGcsPath("dlq", gcsResourceManager).replace("gs://" + artifactBucketName, ""));
jobInfo =
launchDataflowJob(
gcsResourceManager,
spannerResourceManager,
spannerMetadataResourceManager,
subscriptionName.toString(),
null,
null,
null,
null,
null,
CASSANDRA_SOURCE_TYPE);
}
}
}

@AfterClass
public static void cleanUp() throws IOException {
for (SpannerToCassandraSourceDbMaxColumnsIT instance : testInstances) {
instance.tearDownBase();
}
ResourceManagerUtils.cleanResources(
spannerResourceManager,
cassandraResourceManager,
spannerMetadataResourceManager,
gcsResourceManager,
pubsubResourceManager);
}

/**
* Retrieves the total row count of a specified table in Cassandra.
*
* <p>This method executes a `SELECT COUNT(*)` query on the given table and returns the number of
* rows present in it.
*
* @param tableName the name of the table whose row count is to be retrieved.
* @return the total number of rows in the specified table.
* @throws RuntimeException if the query does not return a result.
*/
private long getRowCount(String tableName) {
String query = String.format("SELECT COUNT(*) FROM %s", tableName);
ResultSet resultSet = cassandraResourceManager.executeStatement(query);
Row row = resultSet.one();
if (row != null) {
return row.getLong(0);
} else {
throw new RuntimeException("Query did not return a result for table: " + tableName);
}
}

/** Writes a row with 1,024 columns in Spanner and verifies replication to Cassandra. */
@Test
public void testSpannerToCassandraWithMaxColumns() throws InterruptedException, IOException {
assertThatPipeline(jobInfo).isRunning();
writeRowWithMaxColumnsInSpanner();
assertRowWithMaxColumnsInCassandra();
}

private void writeRowWithMaxColumnsInSpanner() {
List<Mutation> mutations = new ArrayList<>();
Mutation.WriteBuilder mutationBuilder =
Mutation.newInsertOrUpdateBuilder(TEST_TABLE).set("Id").to("SampleTest");

for (int i = 1; i < 1024; i++) {
mutationBuilder.set("Col_" + i).to("TestValue_" + i);
}

mutations.add(mutationBuilder.build());
spannerResourceManager.write(mutations);
LOG.info("Inserted row with 1,024 columns into Spanner using Mutations");
}

private void assertRowWithMaxColumnsInCassandra() {

PipelineOperator.Result result =
pipelineOperator()
.waitForCondition(
createConfig(jobInfo, Duration.ofMinutes(15)), () -> getRowCount(TEST_TABLE) == 2);
assertThatResult(result).meetsConditions();

Iterable<Row> rows;
try {
rows = cassandraResourceManager.readTable(TEST_TABLE);
} catch (Exception e) {
throw new RuntimeException("Failed to read from Cassandra table: " + TEST_TABLE, e);
}

assertThat(rows).hasSize(1);
for (Row row : rows) {
LOG.info("Cassandra Row to Assert for All Data Types: {}", row.getFormattedContents());
String primaryKeyColumn = row.getString("Col_0");
assertEquals("SampleTest", primaryKeyColumn);
for (int i = 1; i < 1024; i++) {
assertEquals("TestValue_" + i, row.getString("Col_" + i));
}
}
LOG.info("Successfully validated 1,024 columns in Cassandra");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE;
import static com.google.common.truth.Truth.assertThat;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
import static org.junit.Assert.assertEquals;

import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.google.cloud.spanner.Mutation;
import com.google.pubsub.v1.SubscriptionName;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.beam.it.cassandra.CassandraResourceManager;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpannerToCassandraSourceDbMaxColumnsSizeIT extends SpannerToSourceDbITBase {

private static final Logger LOG =
LoggerFactory.getLogger(SpannerToCassandraSourceDbMaxColumnsSizeIT.class);

private static final String SPANNER_DDL_RESOURCE =
"SpannerToSourceDbWideRowIT/spanner-max-col-size-schema.sql";
private static final String CASSANDRA_SCHEMA_FILE_RESOURCE =
"SpannerToSourceDbWideRowIT/cassandra-max-col-schema.sql";
private static final String CASSANDRA_CONFIG_FILE_RESOURCE =
"SpannerToSourceDbWideRowIT/cassandra-config-template.conf";

private static final String TEST_TABLE = "TestTable";
private static final HashSet<SpannerToCassandraSourceDbMaxColumnsSizeIT> testInstances =
new HashSet<>();
private static PipelineLauncher.LaunchInfo jobInfo;
public static SpannerResourceManager spannerResourceManager;
private static SpannerResourceManager spannerMetadataResourceManager;
public static CassandraResourceManager cassandraResourceManager;
private static GcsResourceManager gcsResourceManager;
private static PubsubResourceManager pubsubResourceManager;
private SubscriptionName subscriptionName;
private final List<Throwable> assertionErrors = new ArrayList<>();

@Before
public void setUp() throws IOException {
skipBaseCleanup = true;
synchronized (SpannerToCassandraSourceDbMaxColumnsSizeIT.class) {
testInstances.add(this);
if (jobInfo == null) {
spannerResourceManager = createSpannerDatabase(SPANNER_DDL_RESOURCE);
spannerMetadataResourceManager = createSpannerMetadataDatabase();

cassandraResourceManager = generateKeyspaceAndBuildCassandraResource();
gcsResourceManager =
GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials)
.build();
createAndUploadCassandraConfigToGcs(
gcsResourceManager, cassandraResourceManager, CASSANDRA_CONFIG_FILE_RESOURCE);
createCassandraSchema(cassandraResourceManager, CASSANDRA_SCHEMA_FILE_RESOURCE);
pubsubResourceManager = setUpPubSubResourceManager();
subscriptionName =
createPubsubResources(
getClass().getSimpleName(),
pubsubResourceManager,
getGcsPath("dlq", gcsResourceManager).replace("gs://" + artifactBucketName, ""));
jobInfo =
launchDataflowJob(
gcsResourceManager,
spannerResourceManager,
spannerMetadataResourceManager,
subscriptionName.toString(),
null,
null,
null,
null,
null,
CASSANDRA_SOURCE_TYPE);
}
}
}

@AfterClass
public static void cleanUp() throws IOException {
for (SpannerToCassandraSourceDbMaxColumnsSizeIT instance : testInstances) {
instance.tearDownBase();
}
ResourceManagerUtils.cleanResources(
spannerResourceManager,
cassandraResourceManager,
spannerMetadataResourceManager,
gcsResourceManager,
pubsubResourceManager);
}

/**
* Retrieves the total row count of a specified table in Cassandra.
*
* <p>This method executes a `SELECT COUNT(*)` query on the given table and returns the number of
* rows present in it.
*
* @param tableName the name of the table whose row count is to be retrieved.
* @return the total number of rows in the specified table.
* @throws RuntimeException if the query does not return a result.
*/
private long getRowCount(String tableName) {
String query = String.format("SELECT COUNT(*) FROM %s", tableName);
ResultSet resultSet = cassandraResourceManager.executeStatement(query);
Row row = resultSet.one();
if (row != null) {
return row.getLong(0);
} else {
throw new RuntimeException("Query did not return a result for table: " + tableName);
}
}

/** Writes a row with 1,024 columns in Spanner and verifies replication to Cassandra. */
@Test
public void testSpannerToCassandraWithMaxColumns() throws InterruptedException, IOException {
assertThatPipeline(jobInfo).isRunning();
writeRowWithMaxColumnsInSpanner();
assertRowWithMaxColumnsInCassandra();
}

private void writeRowWithMaxColumnsInSpanner() {
List<Mutation> mutations = new ArrayList<>();
Mutation.WriteBuilder mutationBuilder =
Mutation.newInsertOrUpdateBuilder(TEST_TABLE).set("Id").to("SampleTest");

String inputData = "A".repeat(2_621_440);
for (int i = 1; i <= 159; i++) {
mutationBuilder.set("Col_" + i).to(inputData);
}

mutations.add(mutationBuilder.build());
spannerResourceManager.write(mutations);
LOG.info("Inserted row with 159 columns into Spanner using Mutations");
}

private void assertRowWithMaxColumnsInCassandra() {

PipelineOperator.Result result =
pipelineOperator()
.waitForCondition(
createConfig(jobInfo, Duration.ofMinutes(15)), () -> getRowCount(TEST_TABLE) == 2);
assertThatResult(result).meetsConditions();

Iterable<Row> rows;
try {
rows = cassandraResourceManager.readTable(TEST_TABLE);
} catch (Exception e) {
throw new RuntimeException("Failed to read from Cassandra table: " + TEST_TABLE, e);
}

assertThat(rows).hasSize(1);
String inputData = "A".repeat(2_621_440);
for (Row row : rows) {
LOG.info("Cassandra Row to Assert for All Data Types: {}", row.getFormattedContents());
String primaryKeyColumn = row.getString("Col_0");
assertEquals("SampleTest", primaryKeyColumn);
for (int i = 1; i <= 159; i++) {
assertEquals(inputData, row.getString("Col_" + i));
}
}
LOG.info("Successfully validated 1,024 columns in Cassandra");
}
}
Loading
Loading