Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/uw-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
id: variables
run: |
echo "unifiedWorkerHarnessContainerImage=${HARNESS_IMAGE}" >> $GITHUB_OUTPUT
echo "releaseTag=$(curl -s https://api.github.com/repos/GoogleCLoudPlatform/DataflowTemplates/releases/latest | jq '.tag_name' | sed 's/\"//g')" >> $GITHUB_OUTPUT
echo "releaseTag=$(curl -s https://api.github.com/repos/GoogleCloudPlatform/DataflowTemplates/releases/latest | jq '.tag_name' | sed 's/\"//g')" >> $GITHUB_OUTPUT
env:
HARNESS_IMAGE: ${{ inputs.unifiedWorkerHarnessContainerImage }}
- name: Checkout code
Expand Down
6 changes: 6 additions & 0 deletions v2/sourcedb-to-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-cassandra</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>${mssql-jdbc.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.re2j.Pattern;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -59,9 +58,6 @@
public final class MysqlDialectAdapter implements DialectAdapter {

public static final String PAD_SPACE = "PAD SPACE";
public static final String NO_PAD = "NO PAD";
public static final String BINARY_CHARACTER_SET = "binary";
public static final String BINARY_COLLATION = "binary";
private final MySqlVersion mySqlVersion;

private static final Logger logger = LoggerFactory.getLogger(MysqlDialectAdapter.class);
Expand Down Expand Up @@ -370,16 +366,14 @@ private ImmutableMap<String, SourceColumnType> getTableCols(
// String types: Ref https://dev.mysql.com/doc/refman/8.4/en/string-type-syntax.html
.put("CHAR", IndexType.STRING)
.put("VARCHAR", IndexType.STRING)
.put("BINARY", IndexType.STRING)
.put("VARBINARY", IndexType.STRING)
.put("BLOB", IndexType.STRING)
.put("TEXT", IndexType.STRING)
.put("ENUM", IndexType.STRING)
.put("SET", IndexType.STRING)
// Mapping BINARY, VARBINARY and TINYBLOB to Java bigInteger
// Ref https://dev.mysql.com/doc/refman/8.4/en/charset-binary-collations.html
.put("BINARY", IndexType.BINARY)
.put("VARBINARY", IndexType.BINARY)
.put("TINYBLOB", IndexType.BINARY)
.put("TINYTEXT", IndexType.STRING)
.build();

private ImmutableSet<String> binaryColumnTypes = ImmutableSet.of("BINARY", "VARBINARY", "BLOB");

/**
* Get the PadSpace attribute from {@link ResultSet} for index discovery query {@link
* #getIndexDiscoveryQuery(JdbcSchemaReference)}. This method takes care of the fact that older
Expand Down Expand Up @@ -440,28 +434,17 @@ private ImmutableList<SourceColumnIndexInfo> getTableIndexes(
// Column.
String columType = normalizeColumnType(rs.getString(InformationSchemaStatsCols.TYPE_COL));
IndexType indexType = INDEX_TYPE_MAPPING.getOrDefault(columType, IndexType.OTHER);

CollationReference collationReference = null;
// Binary (and similar columns like VarBinary, Blob etc) columns have a fixed character-set
// and collation called "binary".
// Ref https://dev.mysql.com/doc/refman/8.4/en/charset-binary-collations.html
// In information_schema.columns query, these column types show null as character set.
// Ref: https://www.db-fiddle.com/f/kRVPA5jDwZYNj2rsdtif4K/3
// Also for both mySQL 5.7 and 8.0 binary columns have a NO-PAD comparison.
// Ref: https://www.db-fiddle.com/f/kRVPA5jDwZYNj2rsdtif4K/0.
if (binaryColumnTypes.contains(columType) && characterSet == null) {
characterSet = BINARY_CHARACTER_SET;
collation = BINARY_COLLATION;
padSpace = NO_PAD;
}
if (characterSet != null) {
if (indexType.equals(IndexType.STRING)) {
collationReference =
CollationReference.builder()
.setDbCharacterSet(characterSet)
.setDbCollation(collation)
.setDbCharacterSet(escapeMySql(characterSet))
.setDbCollation(escapeMySql(collation))
.setPadSpace(
(padSpace == null) ? false : padSpace.trim().toUpperCase().equals(PAD_SPACE))
.build();
} else {
stringMaxLength = null;
}

indexesBuilder.add(
Expand All @@ -487,6 +470,15 @@ private ImmutableList<SourceColumnIndexInfo> getTableIndexes(
return indexesBuilder.build();
}

@VisibleForTesting
protected static String escapeMySql(String input) {
if (input.startsWith("`")) {
return input;
} else {
return "`" + input + "`";
}
}

private SourceColumnType resultSetToSourceColumnType(ResultSet rs) throws SQLException {
String colType = normalizeColumnType(rs.getString(InformationSchemaCols.TYPE_COL));
long charMaxLength = rs.getLong(InformationSchemaCols.CHAR_MAX_LENGTH_COL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ private static TableConfig getTableConfig(
.forEach(tableConfigBuilder::withPartitionColum);
} else {
ImmutableSet<IndexType> supportedIndexTypes =
ImmutableSet.of(IndexType.NUMERIC, IndexType.STRING, IndexType.BIG_INT_UNSIGNED);
ImmutableSet.of(
IndexType.NUMERIC, IndexType.STRING, IndexType.BIG_INT_UNSIGNED, IndexType.BINARY);
// As of now only Primary key index with Numeric type is supported.
// TODO:
// 1. support non-primary unique indexes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
Expand All @@ -27,21 +26,27 @@
/** Factory to construct {@link BoundaryExtractor} for supported {@link class}. */
public class BoundaryExtractorFactory {

public static final Class BYTE_ARRAY_CLASS = (new byte[] {}).getClass();
private static final ImmutableMap<Class, BoundaryExtractor<?>> extractorMap =
ImmutableMap.of(
Integer.class,
(BoundaryExtractor<Integer>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromIntegers(partitionColumn, resultSet, boundaryTypeMapper),
(BoundaryExtractor<Integer>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromIntegers(partitionColumn, resultSet, boundaryTypeMapper),
Long.class,
(BoundaryExtractor<Long>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromLongs(partitionColumn, resultSet, boundaryTypeMapper),
String.class, (BoundaryExtractor<String>) BoundaryExtractorFactory::fromStrings,
BigInteger.class,
(BoundaryExtractor<BigInteger>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromBigIntegers(partitionColumn, resultSet, boundaryTypeMapper));
(BoundaryExtractor<Long>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromLongs(partitionColumn, resultSet, boundaryTypeMapper),
String.class,
(BoundaryExtractor<String>) BoundaryExtractorFactory::fromStrings,
BigDecimal.class,
(BoundaryExtractor<BigDecimal>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromBigDecimals(partitionColumn, resultSet, boundaryTypeMapper),
BYTE_ARRAY_CLASS,
(BoundaryExtractor<byte[]>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromBinary(partitionColumn, resultSet, boundaryTypeMapper));

/**
* Create a {@link BoundaryExtractor} for the required class.
Expand Down Expand Up @@ -90,20 +95,38 @@ private static Boundary<Long> fromLongs(
.build();
}

private static Boundary<java.math.BigInteger> fromBigIntegers(
private static Boundary<BigDecimal> fromBigDecimals(
PartitionColumn partitionColumn,
ResultSet resultSet,
@Nullable BoundaryTypeMapper boundaryTypeMapper)
throws SQLException {
Preconditions.checkArgument(partitionColumn.columnClass().equals(BigInteger.class));
Preconditions.checkArgument(partitionColumn.columnClass().equals(BigDecimal.class));
resultSet.next();
BigDecimal start = resultSet.getBigDecimal(1);
BigDecimal end = resultSet.getBigDecimal(2);
return Boundary.<java.math.BigInteger>builder()
return Boundary.<BigDecimal>builder()
.setPartitionColumn(partitionColumn)
.setStart(start == null ? null : start.toBigInteger())
.setEnd(end == null ? null : end.toBigInteger())
.setBoundarySplitter(BoundarySplitterFactory.create(BigInteger.class))
.setStart(start)
.setEnd(end)
.setBoundarySplitter(BoundarySplitterFactory.create(BigDecimal.class))
.setBoundaryTypeMapper(boundaryTypeMapper)
.build();
}

private static Boundary<byte[]> fromBinary(
PartitionColumn partitionColumn,
ResultSet resultSet,
@Nullable BoundaryTypeMapper boundaryTypeMapper)
throws SQLException {
Preconditions.checkArgument(partitionColumn.columnClass().equals(BYTE_ARRAY_CLASS));
resultSet.next();
byte[] start = resultSet.getBytes(1);
byte[] end = resultSet.getBytes(2);
return Boundary.<byte[]>builder()
.setPartitionColumn(partitionColumn)
.setStart(start)
.setEnd(end)
.setBoundarySplitter(BoundarySplitterFactory.create(BYTE_ARRAY_CLASS))
.setBoundaryTypeMapper(boundaryTypeMapper)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
*/
package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range;

import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.BoundaryExtractorFactory.BYTE_ARRAY_CLASS;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import org.apache.beam.sdk.transforms.DoFn;

Expand All @@ -26,18 +29,27 @@ public class BoundarySplitterFactory {
private static final ImmutableMap<Class, BoundarySplitter<?>> splittermap =
ImmutableMap.of(
Integer.class,
(BoundarySplitter<Integer>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitIntegers(start, end),
(BoundarySplitter<Integer>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitIntegers(start, end),
Long.class,
(BoundarySplitter<Long>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitLongs(start, end),
(BoundarySplitter<Long>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitLongs(start, end),
BigInteger.class,
(BoundarySplitter<BigInteger>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitBigIntegers(start, end),
String.class, (BoundarySplitter<String>) BoundarySplitterFactory::splitStrings);
(BoundarySplitter<BigInteger>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitBigIntegers(start, end),
BigDecimal.class,
(BoundarySplitter<BigDecimal>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitBigDecimal(start, end),
String.class,
(BoundarySplitter<String>) BoundarySplitterFactory::splitStrings,
BYTE_ARRAY_CLASS,
(BoundarySplitter<byte[]>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitBytes(start, end));

/**
* Creates {@link BoundarySplitter BoundarySplitter&lt;T&gt;} for pass class {@code c} such that
Expand Down Expand Up @@ -132,6 +144,26 @@ private static Long splitLongs(Long start, Long end) {
return (start & end) + ((start ^ end) >> 1);
}

private static BigDecimal splitBigDecimal(BigDecimal start, BigDecimal end) {
BigInteger startBigInt = (start == null) ? null : start.toBigInteger();
BigInteger endBigInt = (end == null) ? null : end.toBigInteger();
BigInteger split = splitBigIntegers(startBigInt, endBigInt);
if (split == null) {
return null;
}
return new BigDecimal(split);
}

private static byte[] splitBytes(byte[] start, byte[] end) {
BigInteger startBigInt = (start == null) ? null : new BigInteger(start);
BigInteger endBigInt = (end == null) ? null : new BigInteger(end);
BigInteger split = splitBigIntegers(startBigInt, endBigInt);
if (split == null) {
return null;
}
return split.toByteArray();
}

private static String splitStrings(
String start,
String end,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
package com.google.cloud.teleport.v2.source.reader.io.schema;

import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.BoundaryExtractorFactory;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.math.BigInteger;
import java.math.BigDecimal;
import javax.annotation.Nullable;

@AutoValue
Expand Down Expand Up @@ -138,6 +139,7 @@ public SourceColumnIndexInfo build() {
public enum IndexType {
NUMERIC,
BIG_INT_UNSIGNED,
BINARY,
STRING,
DATE_TIME,
OTHER
Expand All @@ -148,5 +150,6 @@ public enum IndexType {
ImmutableMap.of(
IndexType.NUMERIC, Long.class,
IndexType.STRING, String.class,
IndexType.BIG_INT_UNSIGNED, BigInteger.class);
IndexType.BIG_INT_UNSIGNED, BigDecimal.class,
IndexType.BINARY, BoundaryExtractorFactory.BYTE_ARRAY_CLASS);
}
Loading
Loading