Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@
<exclude>**/CustomTransformationImplFetcher.*</exclude>
<exclude>**/JarFileReader.*</exclude>
<exclude>**/CustomTransformationWithShardFor*IT.*</exclude>
<exclude>**/CustomTransformationWithCassandraForIT.*</exclude>
<exclude>**/models/*</exclude>
<exclude>**/exceptions/*</exclude>
</excludes>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.custom;

import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException;
import com.google.cloud.teleport.v2.spanner.utils.ISpannerMigrationTransformer;
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationRequest;
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationResponse;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomTransformationWithCassandraForIT implements ISpannerMigrationTransformer {

private static final Logger LOG = LoggerFactory.getLogger(CustomShardIdFetcher.class);

@Override
public void init(String parameters) {
LOG.info("init called with {}", parameters);
}

@Override
public MigrationTransformationResponse toSpannerRow(MigrationTransformationRequest request)
throws InvalidTransformationException {
return new MigrationTransformationResponse(null, false);
}

@Override
public MigrationTransformationResponse toSourceRow(MigrationTransformationRequest request)
throws InvalidTransformationException {
if (request.getTableName().equalsIgnoreCase("customers")) {
Map<String, Object> requestRow = request.getRequestRow();
Map<String, Object> row = new HashMap<>();
row.put("full_name", requestRow.get("first_name") + " " + requestRow.get("last_name"));
MigrationTransformationResponse response = new MigrationTransformationResponse(row, false);
return response;
}
return new MigrationTransformationResponse(null, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
sourceTable,
dmlGeneratorRequest.getNewValuesJson(),
dmlGeneratorRequest.getKeyValuesJson(),
dmlGeneratorRequest.getSourceDbTimezoneOffset());
dmlGeneratorRequest.getSourceDbTimezoneOffset(),
dmlGeneratorRequest.getCustomTransformationResponse());
if (pkColumnNameValues == null) {
LOG.warn(
"Failed to generate primary key values for table {}. Skipping the record.",
Expand Down Expand Up @@ -166,7 +167,8 @@ private static DMLGeneratorResponse generateDMLResponse(
sourceTable,
dmlGeneratorRequest.getNewValuesJson(),
dmlGeneratorRequest.getKeyValuesJson(),
dmlGeneratorRequest.getSourceDbTimezoneOffset());
dmlGeneratorRequest.getSourceDbTimezoneOffset(),
dmlGeneratorRequest.getCustomTransformationResponse());
Map<String, PreparedStatementValueObject<?>> allColumnNamesAndValues =
ImmutableMap.<String, PreparedStatementValueObject<?>>builder()
.putAll(pkColumnNameValues)
Expand Down Expand Up @@ -287,6 +289,7 @@ private static DMLGeneratorResponse getDeleteStatementCQL(
* @param newValuesJson the JSON object containing new values for columns.
* @param keyValuesJson the JSON object containing key values for columns.
* @param sourceDbTimezoneOffset the timezone offset of the source database.
* @param customTransformationResponse the custom transformation
* @return a map of column names to their corresponding prepared statement value objects.
* <p>This method: 1. Iterates over the non-primary key column definitions in the source table
* schema. 2. Maps each column in the source table schema to its corresponding column in the
Expand All @@ -299,24 +302,37 @@ private static Map<String, PreparedStatementValueObject<?>> getColumnValues(
SourceTable sourceTable,
JSONObject newValuesJson,
JSONObject keyValuesJson,
String sourceDbTimezoneOffset) {
String sourceDbTimezoneOffset,
Map<String, Object> customTransformationResponse) {
Map<String, PreparedStatementValueObject<?>> response = new HashMap<>();
Set<String> sourcePKs = sourceTable.getPrimaryKeySet();
Set<String> customTransformColumns = null;
if (customTransformationResponse != null) {
customTransformColumns = customTransformationResponse.keySet();
}
for (Map.Entry<String, SourceColumnDefinition> entry : sourceTable.getColDefs().entrySet()) {
SourceColumnDefinition sourceColDef = entry.getValue();

String colName = sourceColDef.getName();
if (sourcePKs.contains(colName)) {
continue; // we only need non-primary keys
}

PreparedStatementValueObject<?> columnValue;
if (customTransformColumns != null
&& customTransformColumns.contains(sourceColDef.getName())) {
String cassandraType = sourceColDef.getType().getName().toLowerCase();
columnValue =
PreparedStatementValueObject.create(
cassandraType, customTransformationResponse.get(colName));
response.put(sourceColDef.getName(), columnValue);
continue;
}
String colId = entry.getKey();
SpannerColumnDefinition spannerColDef = spannerTable.getColDefs().get(colId);
if (spannerColDef == null) {
continue;
}
String spannerColumnName = spannerColDef.getName();
PreparedStatementValueObject<?> columnValue;
if (keyValuesJson.has(spannerColumnName)) {
columnValue =
getMappedColumnValue(
Expand Down Expand Up @@ -344,6 +360,7 @@ private static Map<String, PreparedStatementValueObject<?>> getColumnValues(
* @param newValuesJson the JSON object containing new values for columns.
* @param keyValuesJson the JSON object containing key values for columns.
* @param sourceDbTimezoneOffset the timezone offset of the source database.
* @param customTransformationResponse the user defined transformation.
* @return a map of primary key column names to their corresponding prepared statement value
* objects, or null if a required column is missing.
* <p>This method: 1. Iterates over the primary key definitions in the source table schema. 2.
Expand All @@ -357,10 +374,14 @@ private static Map<String, PreparedStatementValueObject<?>> getPkColumnValues(
SourceTable sourceTable,
JSONObject newValuesJson,
JSONObject keyValuesJson,
String sourceDbTimezoneOffset) {
String sourceDbTimezoneOffset,
Map<String, Object> customTransformationResponse) {
Map<String, PreparedStatementValueObject<?>> response = new HashMap<>();
ColumnPK[] sourcePKs = sourceTable.getPrimaryKeys();

Set<String> customTransformColumns = null;
if (customTransformationResponse != null) {
customTransformColumns = customTransformationResponse.keySet();
}
for (ColumnPK currentSourcePK : sourcePKs) {
String colId = currentSourcePK.getColId();
SourceColumnDefinition sourceColDef = sourceTable.getColDefs().get(colId);
Expand All @@ -373,7 +394,14 @@ private static Map<String, PreparedStatementValueObject<?>> getPkColumnValues(
}
String spannerColumnName = spannerColDef.getName();
PreparedStatementValueObject<?> columnValue;
if (keyValuesJson.has(spannerColumnName)) {
if (customTransformColumns != null
&& customTransformColumns.contains(sourceColDef.getName())) {
String cassandraType = sourceColDef.getType().getName().toLowerCase();
String columnName = spannerColDef.getName();
columnValue =
PreparedStatementValueObject.create(
cassandraType, customTransformationResponse.get(columnName));
} else if (keyValuesJson.has(spannerColumnName)) {
columnValue =
getMappedColumnValue(
spannerColDef, sourceColDef, keyValuesJson, sourceDbTimezoneOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,30 +172,9 @@
return ByteBuffer.wrap((byte[]) colValue);
} else if (colValue instanceof ByteBuffer) {
return (ByteBuffer) colValue;
} else {
return ByteBuffer.wrap(java.util.Base64.getDecoder().decode((String) colValue));
}
return ByteBuffer.wrap(java.util.Base64.getDecoder().decode((String) colValue));
}

/**
* Converts a hexadecimal string into a byte array.
*
* @param binaryEncodedStr the hexadecimal string to be converted. It must have an even number of
* characters, as each pair of characters represents one byte.
* @return a byte array representing the binary data equivalent of the hexadecimal string.
*/
private static byte[] convertBinaryEncodedStringToByteArray(String binaryEncodedStr) {
int length = binaryEncodedStr.length();
int byteCount = (length + 7) / 8;
byte[] byteArray = new byte[byteCount];

for (int i = 0; i < byteCount; i++) {
int startIndex = i * 8;
int endIndex = Math.min(startIndex + 8, length);
String byteString = binaryEncodedStr.substring(startIndex, endIndex);
byteArray[i] = (byte) Integer.parseInt(byteString, 2);
}

return byteArray;
}

/**
Expand Down Expand Up @@ -322,20 +301,17 @@
String spannerType, String columnName, JSONObject valuesJson) {
try {
if (spannerType.contains("string")) {
return valuesJson.optString(columnName);
String value = valuesJson.optString(columnName);
return value.isEmpty() ? null : value;
} else if (spannerType.contains("bytes")) {
if (valuesJson.isNull(columnName)) {
return null;
}
String hexEncodedString = valuesJson.optString(columnName);
return safeHandle(
() -> {
try {
return safeHandle(() -> convertBinaryEncodedStringToByteArray(hexEncodedString));
} catch (IllegalArgumentException e) {
return parseBlobType(hexEncodedString);
}
});
if (hexEncodedString.isEmpty()) {
return null;

Check warning on line 312 in v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandler.java

View check run for this annotation

Codecov / codecov/patch

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandler.java#L312

Added line #L312 was not covered by tests
}
return safeHandle(() -> parseBlobType(hexEncodedString));
} else {
return valuesJson.isNull(columnName) ? null : valuesJson.opt(columnName);
}
Expand Down
Loading
Loading