Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0a5b551
Dml integration (#53)
taherkl Jan 8, 2025
4929b58
Added extensive UT
pawankashyapollion Jan 9, 2025
37def7a
Cassandra pr bug fixes (#57)
pawankashyapollion Jan 13, 2025
4605682
Cassandra Consolidate Unit Test case and Regression testing fixes (#58)
pawankashyapollion Jan 14, 2025
cd30ad0
Merge pull request #60 from GoogleCloudPlatform/main
taherkl Jan 15, 2025
1651a89
Cassandra pr bug fixes (#64)
pawankashyapollion Jan 16, 2025
331c085
Handle TypeHandler Parsing issue fixes (#65)
akashthawaitcc Jan 16, 2025
373239d
Added Safe handle (#68)
pawankashyapollion Jan 16, 2025
9a0b272
Handle LocalTime For Time Data Type In Cassandra (#69)
pawankashyapollion Jan 17, 2025
38933e0
Cassandra pr bug fixes (#70)
pawankashyapollion Jan 17, 2025
f06ae90
Handle Timestamp Fixes (#72)
pawankashyapollion Jan 17, 2025
3d3fb66
Cassandra pr bug fixes (#75)
pawankashyapollion Jan 20, 2025
b2abbf0
Added PR Review Comments
pawankashyapollion Jan 20, 2025
913909e
Remove NamesCol Dependecy as spannerTableName is same as In Given Map…
pawankashyapollion Jan 20, 2025
7b13e94
Added spannerTableId for fetching Mapping
pawankashyapollion Jan 20, 2025
9eec623
Removed SpannerToID and also Updated Session file with proper structure
pawankashyapollion Jan 20, 2025
a457df2
Timestamp in milisecond
pawankashyapollion Jan 20, 2025
168d3ca
removed assertNotNull from UT wherever possible
pawankashyapollion Jan 21, 2025
edbd5fd
Added Fixes
pawankashyapollion Jan 21, 2025
7e58d4c
Added Note Instead of Question
pawankashyapollion Jan 21, 2025
1fe9ae5
-- review fixes (#78)
pawankashyapollion Jan 22, 2025
1673fc6
Added Bytes to hex to blob conversion
pawankashyapollion Jan 22, 2025
2df3dc9
Handling Bytes as Binary encoded As of now
pawankashyapollion Jan 22, 2025
e589d6c
Passing Null Value to Primary Key as well for cassandra
pawankashyapollion Jan 22, 2025
1cc5916
Added UT fixes
pawankashyapollion Jan 22, 2025
7cfc3a2
Added UT refectoring
pawankashyapollion Jan 22, 2025
b55568e
Reverse merge confict fixes
pawankashyapollion Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public Schema(
this.srcSchema = srcSchema;
this.toSpanner = toSpanner;
this.toSource = toSource;
this.srcToID = toSource;
this.spannerToID = toSpanner;
this.empty = (spSchema == null || srcSchema == null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public class Constants {
public static final String DEFAULT_SHARD_ID = "single_shard";

public static final String SOURCE_MYSQL = "mysql";

public static final String SOURCE_CASSANDRA = "cassandra";

// Message written to the file for filtered records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.templates.dbutils.dml.CassandraTypeHandler;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;

public class CassandraDao implements IDao<DMLGeneratorResponse> {
private final String cassandraUrl;
Expand All @@ -38,22 +38,25 @@ public CassandraDao(

@Override
public void write(DMLGeneratorResponse dmlGeneratorResponse) throws Exception {
try (CqlSession session =
(CqlSession)
connectionHelper.getConnection(this.cassandraUrl)) { // Ensure connection is obtained
if (session == null) {
throw new ConnectionException("Connection is null");
}
PreparedStatementGeneratedResponse preparedStatementGeneratedResponse =
(PreparedStatementGeneratedResponse) dmlGeneratorResponse;
String dmlStatement = preparedStatementGeneratedResponse.getDmlStatement();
PreparedStatement preparedStatement = session.prepare(dmlStatement);
BoundStatement boundStatement =
preparedStatement.bind(
preparedStatementGeneratedResponse.getValues().stream()
.map(PreparedStatementValueObject::value)
.toArray());
session.execute(boundStatement);
CqlSession session = (CqlSession) connectionHelper.getConnection(this.cassandraUrl);
if (session == null) {
throw new ConnectionException("Connection is null");
}
PreparedStatementGeneratedResponse preparedStatementGeneratedResponse =
(PreparedStatementGeneratedResponse) dmlGeneratorResponse;
String dmlStatement = preparedStatementGeneratedResponse.getDmlStatement();
PreparedStatement preparedStatement = session.prepare(dmlStatement);
BoundStatement boundStatement =
preparedStatement.bind(
preparedStatementGeneratedResponse.getValues().stream()
.map(
v -> {
if (v.value() == CassandraTypeHandler.NullClass.INSTANCE) {
return null;
}
return CassandraTypeHandler.castToExpectedType(v.dataType(), v.value());
})
.toArray());
session.execute(boundStatement);
}
}
Loading
Loading