Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@ public Schema() {
this.empty = true;
}

public Schema(Map<String, SpannerTable> spSchema, Map<String, SourceTable> srcSchema) {
this.spSchema = spSchema;
this.srcSchema = srcSchema;
this.syntheticPKeys = new HashMap<String, SyntheticPKey>();
this.empty = (spSchema == null || srcSchema == null);
}

public Schema(
Map<String, SpannerTable> spSchema,
Map<String, SyntheticPKey> syntheticPKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.templates.dbutils.dml.CassandraTypeHandler;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;

public class CassandraDao implements IDao<DMLGeneratorResponse> {
private final String cassandraUrl;
Expand Down Expand Up @@ -51,7 +51,7 @@ public void write(DMLGeneratorResponse dmlGeneratorResponse) throws Exception {
BoundStatement boundStatement =
preparedStatement.bind(
preparedStatementGeneratedResponse.getValues().stream()
.map(PreparedStatementValueObject::value)
.map(v -> CassandraTypeHandler.castToExpectedType(v.dataType(), v.value()))
.toArray());
session.execute(boundStatement);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,59 +85,6 @@ public String toString() {
}
}

/**
* Functional interface for parsing an object value to a specific type.
*
* <p>This interface provides a contract to implement type conversion logic where an input object
* is parsed and transformed into the desired target type.
*
* <p>Example usage:
*
* <pre>{@code
* TypeParser<Integer> intParser = value -> Integer.parseInt(value.toString());
* Integer parsedValue = intParser.parse("123");
* }</pre>
*
* @param <T> The target type to which the value will be parsed.
*/
@FunctionalInterface
public interface TypeParser<T> {

/**
* Parses the given value and converts it into the target type {@code T}.
*
* @param value The input value to be parsed.
* @return The parsed value of type {@code T}.
*/
T parse(Object value);
}

/**
* Functional interface for supplying a value with exception handling.
*
* <p>This interface provides a mechanism to execute logic that may throw a checked exception,
* making it useful for methods where exception handling is required.
*
* <p>Example usage:
*
* <pre>{@code
* HandlerSupplier<String> supplier = () -> {
* if (someCondition) {
* throw new IOException("Error occurred");
* }
* return "Success";
* };
*
* try {
* String result = supplier.get();
* System.out.println(result);
* } catch (Exception e) {
* e.printStackTrace();
* }
* }</pre>
*
* @param <T> The type of value supplied by the supplier.
*/
@FunctionalInterface
private interface HandlerSupplier<T> {

Expand Down Expand Up @@ -702,7 +649,7 @@ private static Object handleSpannerColumnType(

default:
LOG.warn("Unsupported Spanner column type: {}", spannerType);
return null;
throw new IllegalArgumentException("Unsupported Spanner column type: " + spannerType);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ public void processElement(ProcessContext c) throws Exception {
String qualifiedShard = "";
String tableName = record.getTableName();
String keysJsonStr = record.getMod().getKeysJson();
long finalKey;

try {
if (shardingMode.equals(Constants.SHARDING_MODE_SINGLE_SHARD)) {
Expand Down Expand Up @@ -232,7 +231,7 @@ public void processElement(ProcessContext c) throws Exception {

record.setShard(qualifiedShard);
String finalKeyString = tableName + "_" + keysJsonStr + "_" + qualifiedShard;
finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
Long finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
c.output(KV.of(finalKey, record));

} catch (Exception e) {
Expand All @@ -241,7 +240,7 @@ public void processElement(ProcessContext c) throws Exception {
LOG.error("Error fetching shard Id column: " + e.getMessage() + ": " + errors.toString());
// The record has no shard hence will be sent to DLQ in subsequent steps
String finalKeyString = record.getTableName() + "_" + keysJsonStr + "_" + skipDirName;
finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
Long finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
c.output(KV.of(finalKey, record));
}
}
Expand Down
Loading