Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ public void write(DMLGeneratorResponse dmlGeneratorResponse) throws Exception {
BoundStatement boundStatement =
preparedStatement.bind(
preparedStatementGeneratedResponse.getValues().stream()
.map(v -> CassandraTypeHandler.castToExpectedType(v.dataType(), v.value()))
.map(
v -> {
if (v.value() == CassandraTypeHandler.NullClass.INSTANCE) {
return null;
}
return CassandraTypeHandler.castToExpectedType(v.dataType(), v.value());
})
.toArray());
session.execute(boundStatement);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -91,24 +90,19 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
}

NameAndCols tableMapping = schema.getSpannerToID().get(spannerTableName);
if (tableMapping == null) {
LOG.warn(
"Spanner table {} not found in session file. Dropping the record.", spannerTableName);
return new DMLGeneratorResponse("");
}

String spannerTableId = tableMapping.getName();
SpannerTable spannerTable = schema.getSpSchema().get(spannerTableId);
SpannerTable spannerTable = schema.getSpSchema().get(tableMapping.getName());
if (spannerTable == null) {
LOG.warn(
"Spanner table {} not found in session file. Dropping the record.", spannerTableName);
return new DMLGeneratorResponse("");
}

SourceTable sourceTable = schema.getSrcSchema().get(spannerTableId);
SourceTable sourceTable = schema.getSrcSchema().get(tableMapping.getName());
if (sourceTable == null) {
LOG.warn(
"Source table {} not found for Spanner table ID: {}", spannerTableName, spannerTableId);
"Source table {} not found for Spanner table Name: {}",
spannerTableName,
tableMapping.getName());
return new DMLGeneratorResponse("");
}

Expand All @@ -132,69 +126,73 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
sourceTable.getName());
return new DMLGeneratorResponse("");
}

java.sql.Timestamp timestamp = dmlGeneratorRequest.getCommitTimestamp().toSqlTimestamp();
String modType = dmlGeneratorRequest.getModType();
switch (modType) {
case "INSERT":
case "UPDATE":
return generateUpsertStatement(
spannerTable, sourceTable, dmlGeneratorRequest, pkColumnNameValues);
case "DELETE":
long timestamp = Instant.now().toEpochMilli() * 1000;
return getDeleteStatementCQL(sourceTable.getName(), pkColumnNameValues, timestamp);
default:
LOG.error("Unsupported modType: {} for table {}", modType, spannerTableName);
return new DMLGeneratorResponse("");
}
return generatorDMLResponse(
spannerTable, sourceTable, dmlGeneratorRequest, pkColumnNameValues, timestamp, modType);
}

/**
* Generates an upsert (insert or update) DML statement for a given Spanner table based on the
* provided source table, request parameters, and primary key column values.
* Generates a DML response based on the given modification type (INSERT, UPDATE, or DELETE).
*
* <p>This method processes the data from SpannerTable, SourceTable, and DMLGeneratorRequest to
* construct a corresponding CQL statement (INSERT, UPDATE, or DELETE) for Cassandra. The
* statement is generated based on the modification type and includes the appropriate primary key
* and column values, along with an optional timestamp.
*
* @param spannerTable the Spanner table metadata containing column definitions and constraints.
* @param sourceTable the source table metadata containing the table name and structure.
* @param dmlGeneratorRequest the request containing new values, key values, and timezone offset
* for generating the DML.
* @param pkColumnNameValues a map of primary key column names and their corresponding prepared
* statement value objects.
* @return a {@link DMLGeneratorResponse} containing the generated upsert statement and associated
* data.
* <p>This method: 1. Extracts column values from the provided request using the
* `getColumnValues` method. 2. Combines the column values with the primary key column values.
* 3. Constructs the upsert statement using the `getUpsertStatementCQL` method.
* <p>The upsert statement ensures that the record is inserted or updated in the Spanner table
* based on the primary key.
* @param spannerTable the SpannerTable object containing schema information of the Spanner table
* @param sourceTable the SourceTable object containing details of the source table (e.g., name)
* @param dmlGeneratorRequest the request object containing new and key value data in JSON format
* @param pkColumnNameValues a map of primary key column names and their corresponding value
* objects
* @param timestamp the optional timestamp to be included in the Cassandra statement (can be null)
* @param modType the type of modification to perform, either "INSERT", "UPDATE", or "DELETE"
* @return DMLGeneratorResponse the response containing the generated CQL statement and bound
* values
* @throws IllegalArgumentException if the modType is unsupported or if any required data is
* invalid
* @implNote The method uses the following logic: - Combines primary key values and column values
* into a single list of entries. - Depending on the modType: - For "INSERT" or "UPDATE",
* calls {@link #getUpsertStatementCQL}. - For "DELETE", calls {@link #getDeleteStatementCQL}.
* - For unsupported modType values, logs an error and returns an empty response.
*/
private static DMLGeneratorResponse generateUpsertStatement(
private static DMLGeneratorResponse generatorDMLResponse(
SpannerTable spannerTable,
SourceTable sourceTable,
DMLGeneratorRequest dmlGeneratorRequest,
Map<String, PreparedStatementValueObject<?>> pkColumnNameValues) {
Map<String, PreparedStatementValueObject<?>> pkColumnNameValues,
java.sql.Timestamp timestamp,
String modType) {
Map<String, PreparedStatementValueObject<?>> columnNameValues =
getColumnValues(
spannerTable,
sourceTable,
dmlGeneratorRequest.getNewValuesJson(),
dmlGeneratorRequest.getKeyValuesJson(),
dmlGeneratorRequest.getSourceDbTimezoneOffset());
return getUpsertStatementCQL(
sourceTable.getName(),
Instant.now().toEpochMilli() * 1000,
columnNameValues,
pkColumnNameValues);
List<Map.Entry<String, PreparedStatementValueObject<?>>> allEntries =
Stream.concat(pkColumnNameValues.entrySet().stream(), columnNameValues.entrySet().stream())
.collect(Collectors.toList());
switch (modType) {
case "INSERT":
case "UPDATE":
return getUpsertStatementCQL(sourceTable.getName(), timestamp, allEntries);
case "DELETE":
return getDeleteStatementCQL(sourceTable.getName(), timestamp, allEntries);
default:
LOG.error("Unsupported modType: {} for table {}", modType, spannerTable.getName());
return new DMLGeneratorResponse("");
}
}

/**
* Constructs an upsert (insert or update) CQL statement for a Cassandra or similar database using
* the provided table name, timestamp, column values, and primary key values.
*
* @param tableName the name of the table to which the upsert statement applies.
* @param timestamp the timestamp (in microseconds) to use for the operation.
* @param columnNameValues a map of column names and their corresponding prepared statement value
* @param timestamp the timestamp (in java.sql.Timestamp) to use for the operation.
* @param allEntries a map of column names and their corresponding prepared statement value
* objects for non-primary key columns.
* @param pkColumnNameValues a map of primary key column names and their corresponding prepared
* statement value objects.
* @return a {@link DMLGeneratorResponse} containing the generated CQL statement and a list of
* values to be used with the prepared statement.
* <p>This method: 1. Iterates through the primary key and column values, appending column
Expand All @@ -207,19 +205,10 @@ private static DMLGeneratorResponse generateUpsertStatement(
*/
private static DMLGeneratorResponse getUpsertStatementCQL(
String tableName,
long timestamp,
Map<String, PreparedStatementValueObject<?>> columnNameValues,
Map<String, PreparedStatementValueObject<?>> pkColumnNameValues) {
java.sql.Timestamp timestamp,
List<Map.Entry<String, PreparedStatementValueObject<?>>> allEntries) {

String escapedTableName = "\"" + tableName.replace("\"", "\"\"") + "\"";
List<Map.Entry<String, PreparedStatementValueObject<?>>> allEntries =
Stream.concat(pkColumnNameValues.entrySet().stream(), columnNameValues.entrySet().stream())
.filter(
entry ->
entry.getValue().value() != null
&& entry.getValue().value() != CassandraTypeHandler.NullClass.INSTANCE)
.collect(Collectors.toList());

String allColumns =
allEntries.stream()
.map(entry -> "\"" + entry.getKey().replace("\"", "\"\"") + "\"")
Expand All @@ -229,7 +218,7 @@ private static DMLGeneratorResponse getUpsertStatementCQL(
List<PreparedStatementValueObject<?>> values =
allEntries.stream().map(Map.Entry::getValue).collect(Collectors.toList());

PreparedStatementValueObject<Long> timestampObj =
PreparedStatementValueObject<java.sql.Timestamp> timestampObj =
PreparedStatementValueObject.create("USING_TIMESTAMP", timestamp);
values.add(timestampObj);

Expand All @@ -248,7 +237,7 @@ private static DMLGeneratorResponse getUpsertStatementCQL(
* @param tableName the name of the table from which records will be deleted.
* @param pkColumnNameValues a map containing the primary key column names and their corresponding
* prepared statement value objects.
* @param timestamp the timestamp (in microseconds) to use for the delete operation.
* @param timestamp the timestamp (in java.sql.Timestamp) to use for the delete operation.
* @return a {@link DMLGeneratorResponse} containing the generated CQL delete statement and a list
* of values to bind to the prepared statement.
* <p>This method: 1. Iterates through the provided primary key column values, appending
Expand All @@ -261,25 +250,28 @@ private static DMLGeneratorResponse getUpsertStatementCQL(
*/
private static DMLGeneratorResponse getDeleteStatementCQL(
String tableName,
Map<String, PreparedStatementValueObject<?>> pkColumnNameValues,
long timestamp) {
java.sql.Timestamp timestamp,
List<Map.Entry<String, PreparedStatementValueObject<?>>> allEntries) {

String escapedTableName = "\"" + tableName.replace("\"", "\"\"") + "\"";

String deleteConditions =
pkColumnNameValues.entrySet().stream()
.filter(entry -> entry.getValue().value() != CassandraTypeHandler.NullClass.INSTANCE)
allEntries.stream()
.map(entry -> "\"" + entry.getKey().replace("\"", "\"\"") + "\" = ?")
.collect(Collectors.joining(" AND "));

List<PreparedStatementValueObject<?>> values =
pkColumnNameValues.entrySet().stream()
.filter(entry -> entry.getValue().value() != CassandraTypeHandler.NullClass.INSTANCE)
.map(Map.Entry::getValue)
.collect(Collectors.toList());
allEntries.stream().map(Map.Entry::getValue).collect(Collectors.toList());

String preparedStatement =
String.format("DELETE FROM %s WHERE %s", escapedTableName, deleteConditions);
String.format(
"DELETE FROM %s USING TIMESTAMP ? WHERE %s", escapedTableName, deleteConditions);

if (timestamp != null) {
PreparedStatementValueObject<java.sql.Timestamp> timestampObj =
PreparedStatementValueObject.create("USING_TIMESTAMP", timestamp);
values.add(0, timestampObj);
}

return new PreparedStatementGeneratedResponse(preparedStatement, values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public static boolean processRecord(
modType, tableName, newValuesJson, keysJson, sourceDbTimezoneOffset)
.setSchema(schema)
.setCustomTransformationResponse(customTransformationResponse)
.setCommitTimestamp(spannerRecord.getCommitTimestamp())
.build();

DMLGeneratorResponse dmlGeneratorResponse = dmlGenerator.getDMLStatement(dmlGeneratorRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.teleport.v2.templates.models;

import com.google.cloud.Timestamp;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import java.util.Map;
import org.json.JSONObject;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class DMLGeneratorRequest {
private final String sourceDbTimezoneOffset;

private Map<String, Object> customTransformationResponse;
private final Timestamp commitTimestamp;

public DMLGeneratorRequest(Builder builder) {
this.modType = builder.modType;
Expand All @@ -62,6 +64,11 @@ public DMLGeneratorRequest(Builder builder) {
this.keyValuesJson = builder.keyValuesJson;
this.sourceDbTimezoneOffset = builder.sourceDbTimezoneOffset;
this.customTransformationResponse = builder.customTransformationResponse;
this.commitTimestamp = builder.commitTimestamp;
}

public Timestamp getCommitTimestamp() {
return this.commitTimestamp;
}

public String getModType() {
Expand Down Expand Up @@ -100,6 +107,7 @@ public static class Builder {
private final String sourceDbTimezoneOffset;
private Schema schema;
private Map<String, Object> customTransformationResponse;
private Timestamp commitTimestamp;

public Builder(
String modType,
Expand All @@ -119,6 +127,11 @@ public Builder setSchema(Schema schema) {
return this;
}

public Builder setCommitTimestamp(Timestamp commitTimestamp) {
this.commitTimestamp = commitTimestamp;
return this;
}

public Builder setCustomTransformationResponse(
Map<String, Object> customTransformationResponse) {
this.customTransformationResponse = customTransformationResponse;
Expand Down
Loading
Loading