Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
Expand Down Expand Up @@ -72,45 +66,6 @@ public static int calculateBitSetWidthInBytes(int arity) {
return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
}

public static int calculateFixPartSizeInBytes(int arity) {
return calculateBitSetWidthInBytes(arity) + 8 * arity;
}

/**
* If it is a fixed-length field, we can call this BinaryRecordData's setXX method for in-place
* updates. If it is variable-length field, can't use this method, because the underlying data
* is stored continuously.
*/
public static boolean isInFixedLengthPart(DataType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case BIGINT:
case FLOAT:
case DOUBLE:
return true;
case DECIMAL:
return DecimalData.isCompact(((DecimalType) type).getPrecision());
case TIMESTAMP_WITHOUT_TIME_ZONE:
return TimestampData.isCompact(((TimestampType) type).getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return LocalZonedTimestampData.isCompact(
((LocalZonedTimestampType) type).getPrecision());
case TIMESTAMP_WITH_TIME_ZONE:
return ZonedTimestampData.isCompact(((ZonedTimestampType) type).getPrecision());
default:
return false;
}
}

public static boolean isMutable(DataType type) {
return isInFixedLengthPart(type) || type.getTypeRoot() == DataTypeRoot.DECIMAL;
}

private final int arity;
private final int nullBitsSizeInBytes;

Expand Down Expand Up @@ -213,10 +168,6 @@ public DecimalData getDecimal(int pos, int precision, int scale) {
public TimestampData getTimestamp(int pos, int precision) {
assertIndexIsValid(pos);

if (TimestampData.isCompact(precision)) {
return TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos)));
}

int fieldOffset = getFieldOffset(pos);
final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
return BinarySegmentUtils.readTimestampData(segments, offset, offsetAndNanoOfMilli);
Expand All @@ -233,11 +184,6 @@ public ZonedTimestampData getZonedTimestamp(int pos, int precision) {
public LocalZonedTimestampData getLocalZonedTimestampData(int pos, int precision) {
assertIndexIsValid(pos);

if (LocalZonedTimestampData.isCompact(precision)) {
return LocalZonedTimestampData.fromEpochMillis(
segments[0].getLong(getFieldOffset(pos)));
}

int fieldOffset = getFieldOffset(pos);
final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
return BinarySegmentUtils.readLocalZonedTimestampData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,95 @@ public void testMysql8TimeDataTypes() throws Throwable {
fullTypesMySql8Database, recordType, expectedSnapshot, expectedStreamRecord);
}

@Test
public void testMysql57PrecisionTypes() throws Throwable {
testMysqlPrecisionTypes(fullTypesMySql57Database);
}

@Test
public void testMysql8PrecisionTypes() throws Throwable {
testMysqlPrecisionTypes(fullTypesMySql8Database);
}

public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
RowType recordType =
RowType.of(
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.DECIMAL(6, 2),
DataTypes.DECIMAL(9, 4),
DataTypes.DECIMAL(20, 4),
DataTypes.TIME(0),
DataTypes.TIME(3),
DataTypes.TIME(6),
DataTypes.TIMESTAMP(0),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(3),
DataTypes.TIMESTAMP_LTZ(6),
DataTypes.TIMESTAMP_LTZ(0));

Object[] expectedSnapshot =
new Object[] {
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
DecimalData.fromBigDecimal(new BigDecimal("123.4"), 6, 2),
DecimalData.fromBigDecimal(new BigDecimal("1234.5"), 9, 4),
DecimalData.fromBigDecimal(new BigDecimal("1234.56"), 20, 4),
64800000,
64822100,
64822100,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:00")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22"))
};

Object[] expectedStreamRecord =
new Object[] {
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
DecimalData.fromBigDecimal(new BigDecimal("123.4"), 6, 2),
DecimalData.fromBigDecimal(new BigDecimal("1234.5"), 9, 4),
DecimalData.fromBigDecimal(new BigDecimal("1234.56"), 20, 4),
64800000,
64822100,
null,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:00")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22"))
};

database.createAndInitialize();
CloseableIterator<Event> iterator =
env.fromSource(
getFlinkSourceProvider(new String[] {"precision_types"}, database)
.getSource(),
WatermarkStrategy.noWatermarks(),
"Event-Source")
.executeAndCollect();

// skip CreateTableEvent
List<Event> snapshotResults = MySqSourceTestUtils.fetchResults(iterator, 2);
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(1)).after();

Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, recordType))
.isEqualTo(expectedSnapshot);

try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE precision_types SET time_6_c = null WHERE id = 1;");
}

List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator, 1);
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, recordType))
.isEqualTo(expectedStreamRecord);
}

private void testCommonDataTypes(UniqueDatabase database) throws Exception {
database.createAndInitialize();
CloseableIterator<Event> iterator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public void testMysql8AccessTimeTypesSchema() {
private void testAccessDatabaseAndTable(UniqueDatabase database) {
database.createAndInitialize();

String[] tables = new String[] {"common_types", "time_types"};
String[] tables = new String[] {"common_types", "time_types", "precision_types"};
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);

assertThatThrownBy(metadataAccessor::listNamespaces)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,37 @@ VALUES (DEFAULT,
'2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
'2020-07-17 18:00:22',
NULL);
NULL);

CREATE TABLE precision_types
(
id SERIAL,
decimal_c0 DECIMAL(6, 2),
decimal_c1 DECIMAL(9, 4),
decimal_c2 DECIMAL(20, 4),
time_c TIME(0),
time_3_c TIME(3),
time_6_c TIME(6),
datetime_c DATETIME(0),
datetime3_c DATETIME(3),
datetime6_c DATETIME(6),
timestamp_c TIMESTAMP(0) NULL,
timestamp3_c TIMESTAMP(3) NULL,
timestamp6_c TIMESTAMP(6) NULL,
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;

INSERT INTO precision_types
VALUES (DEFAULT,
123.4,
1234.5,
1234.56,
'18:00',
'18:00:22.1',
'18:00:22.1',
'2020-07-17 18:00',
'2020-07-17 18:00:22',
'2020-07-17 18:00:22',
'2020-07-17 18:00',
'2020-07-17 18:00:22',
'2020-07-17 18:00:22');
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,37 @@ VALUES (DEFAULT,
'2020-07-17 18:00:22',
'2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
NULL);
NULL);

CREATE TABLE precision_types
(
id SERIAL,
decimal_c0 DECIMAL(6, 2),
decimal_c1 DECIMAL(9, 4),
decimal_c2 DECIMAL(20, 4),
time_c TIME(0),
time_3_c TIME(3),
time_6_c TIME(6),
datetime_c DATETIME(0),
datetime3_c DATETIME(3),
datetime6_c DATETIME(6),
timestamp_c TIMESTAMP(0),
timestamp3_c TIMESTAMP(3),
timestamp6_c TIMESTAMP(6),
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;

INSERT INTO precision_types
VALUES (DEFAULT,
123.4,
1234.5,
1234.56,
'18:00',
'18:00:22.1',
'18:00:22.1',
'2020-07-17 18:00',
'2020-07-17 18:00:22',
'2020-07-17 18:00:22',
'2020-07-17 18:00',
'2020-07-17 18:00:22',
'2020-07-17 18:00:22');
Original file line number Diff line number Diff line change
Expand Up @@ -173,46 +173,38 @@ public void writeDecimal(int pos, DecimalData value, int precision) {

@Override
public void writeTimestamp(int pos, TimestampData value, int precision) {
if (TimestampData.isCompact(precision)) {
writeLong(pos, value.getMillisecond());
} else {
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
ensureCapacity(8);
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
ensureCapacity(8);

if (value == null) {
setNullBit(pos);
// zero-out the bytes
segment.putLong(cursor, 0L);
setOffsetAndSize(pos, cursor, 0);
} else {
segment.putLong(cursor, value.getMillisecond());
setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
}

cursor += 8;
if (value == null) {
setNullBit(pos);
// zero-out the bytes
segment.putLong(cursor, 0L);
setOffsetAndSize(pos, cursor, 0);
} else {
segment.putLong(cursor, value.getMillisecond());
setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
}

cursor += 8;
}

@Override
public void writeLocalZonedTimestamp(int pos, LocalZonedTimestampData value, int precision) {
if (LocalZonedTimestampData.isCompact(precision)) {
writeLong(pos, value.getEpochMillisecond());
} else {
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
ensureCapacity(8);
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
ensureCapacity(8);

if (value == null) {
setNullBit(pos);
// zero-out the bytes
segment.putLong(cursor, 0L);
setOffsetAndSize(pos, cursor, 0);
} else {
segment.putLong(cursor, value.getEpochMillisecond());
setOffsetAndSize(pos, cursor, value.getEpochNanoOfMillisecond());
}

cursor += 8;
if (value == null) {
setNullBit(pos);
// zero-out the bytes
segment.putLong(cursor, 0L);
setOffsetAndSize(pos, cursor, 0);
} else {
segment.putLong(cursor, value.getEpochMillisecond());
setOffsetAndSize(pos, cursor, value.getEpochNanoOfMillisecond());
}

cursor += 8;
}

@Override
Expand Down