Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
15b621d
Merge remote-tracking branch 'upstream/main'
Nov 27, 2024
d314c70
Merge pull request #9 from GoogleCloudPlatform/main
taherkl Dec 9, 2024
71a6477
Merge pull request #20 from GoogleCloudPlatform/main
taherkl Dec 16, 2024
7512825
Merge pull request #24 from GoogleCloudPlatform/main
taherkl Dec 17, 2024
5aa21dc
Merge pull request #26 from GoogleCloudPlatform/main
taherkl Dec 18, 2024
04bac39
Merge pull request #28 from GoogleCloudPlatform/main
taherkl Dec 20, 2024
f618128
Merge pull request #29 from GoogleCloudPlatform/main
akashthawaitcc Dec 23, 2024
e53e8e4
Merge pull request #32 from GoogleCloudPlatform/main
akashthawaitcc Dec 26, 2024
407a593
Merge pull request #36 from GoogleCloudPlatform/main
taherkl Dec 30, 2024
0d140e6
Merge pull request #40 from GoogleCloudPlatform/main
taherkl Jan 2, 2025
ae3037a
Merge pull request #46 from GoogleCloudPlatform/main
taherkl Jan 7, 2025
ef5ae8d
Merge pull request #48 from GoogleCloudPlatform/main
taherkl Jan 7, 2025
c6442ba
Merge pull request #52 from GoogleCloudPlatform/main
taherkl Jan 8, 2025
7c02a96
Merge pull request #55 from GoogleCloudPlatform/main
taherkl Jan 15, 2025
e3f4369
Merge pull request #79 from GoogleCloudPlatform/main
taherkl Jan 23, 2025
5d5f1a8
Merge pull request #83 from GoogleCloudPlatform/main
taherkl Jan 28, 2025
f1a4d1e
Merge pull request #84 from GoogleCloudPlatform/main
taherkl Jan 28, 2025
22e677f
Merge pull request #87 from GoogleCloudPlatform/main
taherkl Jan 30, 2025
43b6699
* Addition of Load Tests in SpannerToSourceDB For Cassandra (#89)
pawankashyapollion Jan 31, 2025
f4c9811
Address Merge conflict
pawankashyapollion Feb 5, 2025
30f1c7c
Added LT Refectored (#92)
pawankashyapollion Feb 5, 2025
f9b1e3e
Added POM Dependecies
pawankashyapollion Feb 5, 2025
2528917
Merge branch 'main' into cassandra_rr_load_tests
pawankashyapollion Feb 12, 2025
ee9c99a
sync upstream/main (#98)
akashthawaitcc Feb 12, 2025
c319cbe
RR LOAD TEST FIXES (#101)
pawankashyapollion Feb 13, 2025
72d4807
Merge branch 'main' into cassandra_rr_load_tests
pawankashyapollion Feb 13, 2025
186075f
Merge remote-tracking branch 'upstream/main'
Feb 17, 2025
c4fff0f
Merge remote-tracking branch 'upstream/main'
Feb 17, 2025
dc5e904
Merge pull request #107 from GoogleCloudPlatform/main
taherkl Feb 18, 2025
60c33a1
Merge pull request #113 from GoogleCloudPlatform/main
taherkl Feb 19, 2025
a5a73c2
Resolved PR comments (#115)
pawankashyapollion Feb 20, 2025
6a55499
Merge pull request #116 from GoogleCloudPlatform/main
taherkl Feb 21, 2025
eeec9e2
Added Module Dependency Fixes
pawankashyapollion Feb 21, 2025
e676e5c
Added Copyrigh
pawankashyapollion Feb 21, 2025
f466336
Merge pull request #119 from GoogleCloudPlatform/main
taherkl Feb 21, 2025
8504aeb
Merge pull request #120 from GoogleCloudPlatform/main
taherkl Feb 21, 2025
96868f3
Added missing commit
pawankashyapollion Feb 24, 2025
15b906b
Enhanced Retry Logic (#2196)
taherkl Feb 21, 2025
421f5f3
Adding support for Cassandra map (#2209)
VardhanThigle Feb 21, 2025
afff9c2
changes (#2212)
asthamohta Feb 21, 2025
06310d9
Fix inf issues in Datastream reader (#2213)
dhercher Feb 21, 2025
a28df77
Upgrade Beam version to 2.63.0 (#2206)
jrmccluskey Feb 21, 2025
9d5a0d2
SkipShade for Spanner common module (#2194)
Abacn Feb 22, 2025
c713ae9
Add load test for cross db txn (#2199)
Deep1998 Feb 24, 2025
b0fec28
Report Lineage for CsvToBigQuery template (#2205)
Abacn Feb 26, 2025
9316565
Spanner Import/Export INTERLEAVE IN (#2128)
jjfox15 Feb 26, 2025
673f380
Add SCRAM-SHA-512 authentication support to Kafka templates (#2181)
vgnanasekaran Feb 26, 2025
38e1604
Update the required Java version in the base doc, then regenerate doc…
damccorm Feb 26, 2025
04d9281
Post 2.63.0 fixes (#2216)
Abacn Feb 26, 2025
406b59e
bug-fix: Use jdbc connection properties for reverse migration (#2198)
asthamohta Feb 27, 2025
db5f80d
Support partitioned reads for DateTime column type in JDBC to BigQuer…
sharan-malyala Feb 27, 2025
3b3d3cc
Disabling flaky test to unblock dataflow release (#2220)
shreyakhajanchi Feb 27, 2025
586231a
Add logic to skip runnerV2 for the ITs (#2219)
Rudra-Gujarathi Feb 27, 2025
f2f2030
Add warning about caching with plugin (#2221)
derrickaw Feb 27, 2025
34e4732
[DatastreamToSpanner] Spanner Exception handling (#2185)
darshan-sj Feb 28, 2025
7d9dfcd
Merge pull request #127 from GoogleCloudPlatform/main
taherkl Mar 4, 2025
7c64de6
Merge branch 'main' into cassandra_rr_load_tests
pawankashyapollion Mar 5, 2025
b1c481e
Added Cassandra Resource Manager Refectoring and removed Generics
pawankashyapollion Mar 5, 2025
6e95a93
Added Keyspace Voilation fixes
pawankashyapollion Mar 10, 2025
4f7d5b1
minor changes
Mar 11, 2025
b877cf1
Create session for row check
Mar 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions it/cassandra/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~ Copyright (C) 2025 Google Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.google.cloud.teleport</groupId>
<artifactId>integration-testing-lib</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>it-cassandra</artifactId>

<properties>
<parquet-avro.version>1.12.0</parquet-avro.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-it-conditions</artifactId>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>${autovalue.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
<version>${autovalue.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-it-cassandra</artifactId>
<exclusions>
<exclusion>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.it.cassandra.conditions;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.google.auto.value.AutoValue;
import java.net.InetSocketAddress;
import java.time.Duration;
import javax.annotation.Nullable;
import org.apache.beam.it.cassandra.CassandraResourceManager;
import org.apache.beam.it.conditions.ConditionCheck;

/**
* Condition check for verifying the number of rows in a Cassandra table. This class is generic,
* allowing any type of Cassandra resource manager to be used at runtime.
*/
@AutoValue
public abstract class CassandraRowsCheck extends ConditionCheck {

@Nullable
abstract CassandraResourceManager resourceManager();

abstract String tableName();

abstract Integer minRows();

@Nullable
abstract Integer maxRows();

@Override
public String getDescription() {
if (maxRows() != null) {
return String.format(
"Cassandra table check if table %s has between %d and %d rows",
tableName(), minRows(), maxRows());
}
return String.format(
"Cassandra table check if table %s has at least %d rows", tableName(), minRows());
}

/**
* Gets the row count for the specified table using the given CassandraResourceManager.
*
* @param resourceManager The CassandraResourceManager to use for the query.
* @param tableName The name of the table to count rows from.
* @return The number of rows in the table.
*/
private long getRowCount(CassandraResourceManager resourceManager, String tableName) {
if (resourceManager == null) {
throw new IllegalArgumentException("CassandraResourceManager must not be null.");
}
try (CqlSession session =
CqlSession.builder()
.addContactPoint(
new InetSocketAddress(resourceManager.getHost(), resourceManager.getPort()))
.withLocalDatacenter("datacenter1")
.build()) {

String query =
String.format("SELECT COUNT(*) FROM %s.%s", resourceManager.getKeyspaceName(), tableName);
SimpleStatement statement =
SimpleStatement.builder(query).setTimeout(Duration.ofSeconds(20)).build();
ResultSet resultSet = session.execute(statement);
Row row = resultSet.one();
if (row != null) {
return row.getLong(0);
} else {
throw new RuntimeException("Query did not return a result for table: " + tableName);
}
} catch (Exception e) {
throw new RuntimeException("Failed to execute query on CassandraResourceManager", e);
}
}

@Override
public CheckResult check() {
long totalRows = getRowCount(resourceManager(), tableName());
if (totalRows < minRows()) {
return new CheckResult(
false,
String.format("Expected at least %d rows but found only %d", minRows(), totalRows));
}
if (maxRows() != null && totalRows > maxRows()) {
return new CheckResult(
false, String.format("Expected up to %d rows but found %d", maxRows(), totalRows));
}

if (maxRows() != null) {
return new CheckResult(
true,
String.format(
"Expected between %d and %d rows and found %d", minRows(), maxRows(), totalRows));
}

return new CheckResult(
true, String.format("Expected at least %d rows and found %d", minRows(), totalRows));
}

/**
* Builder for {@link CassandraRowsCheck}. Now allows setting the CassandraResourceManager at
* runtime.
*/
public static Builder builder(String tableName) {
return new AutoValue_CassandraRowsCheck.Builder().setTableName(tableName);
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setResourceManager(CassandraResourceManager resourceManager);

public abstract Builder setTableName(String tableName);

public abstract Builder setMinRows(Integer minRows);

public abstract Builder setMaxRows(Integer maxRows);

abstract CassandraRowsCheck autoBuild();

public CassandraRowsCheck build() {
return autoBuild();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Package that contains reusable Cassandra conditions. */
package org.apache.beam.it.cassandra.conditions;
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -831,5 +831,6 @@
<module>plaintext-logging</module>
<module>python</module>
<module>yaml</module>
<module>it/cassandra</module>
</modules>
</project>
6 changes: 6 additions & 0 deletions v2/spanner-to-sourcedb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.teleport</groupId>
<artifactId>it-cassandra</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-it-cassandra</artifactId>
Expand Down
Loading
Loading