Skip to content

Commit 07446d1

Browse files
authored
[FLINK-35715][common] Ignore the compact optimize for mysql timestamp type in BinaryRecordData (#3511)
1 parent 09f36a4 commit 07446d1

File tree

6 files changed

+182
-89
lines changed

6 files changed

+182
-89
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,6 @@
2626
import org.apache.flink.cdc.common.data.StringData;
2727
import org.apache.flink.cdc.common.data.TimestampData;
2828
import org.apache.flink.cdc.common.data.ZonedTimestampData;
29-
import org.apache.flink.cdc.common.types.DataType;
30-
import org.apache.flink.cdc.common.types.DataTypeRoot;
31-
import org.apache.flink.cdc.common.types.DecimalType;
32-
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
33-
import org.apache.flink.cdc.common.types.TimestampType;
34-
import org.apache.flink.cdc.common.types.ZonedTimestampType;
3529
import org.apache.flink.cdc.common.utils.Preconditions;
3630
import org.apache.flink.core.memory.MemorySegment;
3731
import org.apache.flink.core.memory.MemorySegmentFactory;
@@ -72,45 +66,6 @@ public static int calculateBitSetWidthInBytes(int arity) {
7266
return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
7367
}
7468

75-
public static int calculateFixPartSizeInBytes(int arity) {
76-
return calculateBitSetWidthInBytes(arity) + 8 * arity;
77-
}
78-
79-
/**
80-
* If it is a fixed-length field, we can call this BinaryRecordData's setXX method for in-place
81-
* updates. If it is variable-length field, can't use this method, because the underlying data
82-
* is stored continuously.
83-
*/
84-
public static boolean isInFixedLengthPart(DataType type) {
85-
switch (type.getTypeRoot()) {
86-
case BOOLEAN:
87-
case TINYINT:
88-
case SMALLINT:
89-
case INTEGER:
90-
case DATE:
91-
case TIME_WITHOUT_TIME_ZONE:
92-
case BIGINT:
93-
case FLOAT:
94-
case DOUBLE:
95-
return true;
96-
case DECIMAL:
97-
return DecimalData.isCompact(((DecimalType) type).getPrecision());
98-
case TIMESTAMP_WITHOUT_TIME_ZONE:
99-
return TimestampData.isCompact(((TimestampType) type).getPrecision());
100-
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
101-
return LocalZonedTimestampData.isCompact(
102-
((LocalZonedTimestampType) type).getPrecision());
103-
case TIMESTAMP_WITH_TIME_ZONE:
104-
return ZonedTimestampData.isCompact(((ZonedTimestampType) type).getPrecision());
105-
default:
106-
return false;
107-
}
108-
}
109-
110-
public static boolean isMutable(DataType type) {
111-
return isInFixedLengthPart(type) || type.getTypeRoot() == DataTypeRoot.DECIMAL;
112-
}
113-
11469
private final int arity;
11570
private final int nullBitsSizeInBytes;
11671

@@ -213,10 +168,6 @@ public DecimalData getDecimal(int pos, int precision, int scale) {
213168
public TimestampData getTimestamp(int pos, int precision) {
214169
assertIndexIsValid(pos);
215170

216-
if (TimestampData.isCompact(precision)) {
217-
return TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos)));
218-
}
219-
220171
int fieldOffset = getFieldOffset(pos);
221172
final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
222173
return BinarySegmentUtils.readTimestampData(segments, offset, offsetAndNanoOfMilli);
@@ -233,11 +184,6 @@ public ZonedTimestampData getZonedTimestamp(int pos, int precision) {
233184
public LocalZonedTimestampData getLocalZonedTimestampData(int pos, int precision) {
234185
assertIndexIsValid(pos);
235186

236-
if (LocalZonedTimestampData.isCompact(precision)) {
237-
return LocalZonedTimestampData.fromEpochMillis(
238-
segments[0].getLong(getFieldOffset(pos)));
239-
}
240-
241187
int fieldOffset = getFieldOffset(pos);
242188
final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
243189
return BinarySegmentUtils.readLocalZonedTimestampData(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,95 @@ public void testMysql8TimeDataTypes() throws Throwable {
223223
fullTypesMySql8Database, recordType, expectedSnapshot, expectedStreamRecord);
224224
}
225225

226+
@Test
227+
public void testMysql57PrecisionTypes() throws Throwable {
228+
testMysqlPrecisionTypes(fullTypesMySql57Database);
229+
}
230+
231+
@Test
232+
public void testMysql8PrecisionTypes() throws Throwable {
233+
testMysqlPrecisionTypes(fullTypesMySql8Database);
234+
}
235+
236+
public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
237+
RowType recordType =
238+
RowType.of(
239+
DataTypes.DECIMAL(20, 0).notNull(),
240+
DataTypes.DECIMAL(6, 2),
241+
DataTypes.DECIMAL(9, 4),
242+
DataTypes.DECIMAL(20, 4),
243+
DataTypes.TIME(0),
244+
DataTypes.TIME(3),
245+
DataTypes.TIME(6),
246+
DataTypes.TIMESTAMP(0),
247+
DataTypes.TIMESTAMP(3),
248+
DataTypes.TIMESTAMP(6),
249+
DataTypes.TIMESTAMP_LTZ(0),
250+
DataTypes.TIMESTAMP_LTZ(3),
251+
DataTypes.TIMESTAMP_LTZ(6),
252+
DataTypes.TIMESTAMP_LTZ(0));
253+
254+
Object[] expectedSnapshot =
255+
new Object[] {
256+
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
257+
DecimalData.fromBigDecimal(new BigDecimal("123.4"), 6, 2),
258+
DecimalData.fromBigDecimal(new BigDecimal("1234.5"), 9, 4),
259+
DecimalData.fromBigDecimal(new BigDecimal("1234.56"), 20, 4),
260+
64800000,
261+
64822100,
262+
64822100,
263+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:00")),
264+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
265+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
266+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")),
267+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
268+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22"))
269+
};
270+
271+
Object[] expectedStreamRecord =
272+
new Object[] {
273+
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
274+
DecimalData.fromBigDecimal(new BigDecimal("123.4"), 6, 2),
275+
DecimalData.fromBigDecimal(new BigDecimal("1234.5"), 9, 4),
276+
DecimalData.fromBigDecimal(new BigDecimal("1234.56"), 20, 4),
277+
64800000,
278+
64822100,
279+
null,
280+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:00")),
281+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
282+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
283+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")),
284+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
285+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22"))
286+
};
287+
288+
database.createAndInitialize();
289+
CloseableIterator<Event> iterator =
290+
env.fromSource(
291+
getFlinkSourceProvider(new String[] {"precision_types"}, database)
292+
.getSource(),
293+
WatermarkStrategy.noWatermarks(),
294+
"Event-Source")
295+
.executeAndCollect();
296+
297+
// skip CreateTableEvent
298+
List<Event> snapshotResults = MySqSourceTestUtils.fetchResults(iterator, 2);
299+
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(1)).after();
300+
301+
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, recordType))
302+
.isEqualTo(expectedSnapshot);
303+
304+
try (Connection connection = database.getJdbcConnection();
305+
Statement statement = connection.createStatement()) {
306+
statement.execute("UPDATE precision_types SET time_6_c = null WHERE id = 1;");
307+
}
308+
309+
List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator, 1);
310+
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
311+
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, recordType))
312+
.isEqualTo(expectedStreamRecord);
313+
}
314+
226315
private void testCommonDataTypes(UniqueDatabase database) throws Exception {
227316
database.createAndInitialize();
228317
CloseableIterator<Event> iterator =

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ public void testMysql8AccessTimeTypesSchema() {
210210
private void testAccessDatabaseAndTable(UniqueDatabase database) {
211211
database.createAndInitialize();
212212

213-
String[] tables = new String[] {"common_types", "time_types"};
213+
String[] tables = new String[] {"common_types", "time_types", "precision_types"};
214214
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);
215215

216216
assertThatThrownBy(metadataAccessor::listNamespaces)

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,37 @@ VALUES (DEFAULT,
123123
'2020-07-17 18:00:22.123',
124124
'2020-07-17 18:00:22.123456',
125125
'2020-07-17 18:00:22',
126-
NULL);
126+
NULL);
127+
128+
CREATE TABLE precision_types
129+
(
130+
id SERIAL,
131+
decimal_c0 DECIMAL(6, 2),
132+
decimal_c1 DECIMAL(9, 4),
133+
decimal_c2 DECIMAL(20, 4),
134+
time_c TIME(0),
135+
time_3_c TIME(3),
136+
time_6_c TIME(6),
137+
datetime_c DATETIME(0),
138+
datetime3_c DATETIME(3),
139+
datetime6_c DATETIME(6),
140+
timestamp_c TIMESTAMP(0) NULL,
141+
timestamp3_c TIMESTAMP(3) NULL,
142+
timestamp6_c TIMESTAMP(6) NULL,
143+
PRIMARY KEY (id)
144+
) DEFAULT CHARSET=utf8;
145+
146+
INSERT INTO precision_types
147+
VALUES (DEFAULT,
148+
123.4,
149+
1234.5,
150+
1234.56,
151+
'18:00',
152+
'18:00:22.1',
153+
'18:00:22.1',
154+
'2020-07-17 18:00',
155+
'2020-07-17 18:00:22',
156+
'2020-07-17 18:00:22',
157+
'2020-07-17 18:00',
158+
'2020-07-17 18:00:22',
159+
'2020-07-17 18:00:22');

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,37 @@ VALUES (DEFAULT,
127127
'2020-07-17 18:00:22',
128128
'2020-07-17 18:00:22.123',
129129
'2020-07-17 18:00:22.123456',
130-
NULL);
130+
NULL);
131+
132+
CREATE TABLE precision_types
133+
(
134+
id SERIAL,
135+
decimal_c0 DECIMAL(6, 2),
136+
decimal_c1 DECIMAL(9, 4),
137+
decimal_c2 DECIMAL(20, 4),
138+
time_c TIME(0),
139+
time_3_c TIME(3),
140+
time_6_c TIME(6),
141+
datetime_c DATETIME(0),
142+
datetime3_c DATETIME(3),
143+
datetime6_c DATETIME(6),
144+
timestamp_c TIMESTAMP(0),
145+
timestamp3_c TIMESTAMP(3),
146+
timestamp6_c TIMESTAMP(6),
147+
PRIMARY KEY (id)
148+
) DEFAULT CHARSET=utf8;
149+
150+
INSERT INTO precision_types
151+
VALUES (DEFAULT,
152+
123.4,
153+
1234.5,
154+
1234.56,
155+
'18:00',
156+
'18:00:22.1',
157+
'18:00:22.1',
158+
'2020-07-17 18:00',
159+
'2020-07-17 18:00:22',
160+
'2020-07-17 18:00:22',
161+
'2020-07-17 18:00',
162+
'2020-07-17 18:00:22',
163+
'2020-07-17 18:00:22');

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -173,46 +173,38 @@ public void writeDecimal(int pos, DecimalData value, int precision) {
173173

174174
@Override
175175
public void writeTimestamp(int pos, TimestampData value, int precision) {
176-
if (TimestampData.isCompact(precision)) {
177-
writeLong(pos, value.getMillisecond());
178-
} else {
179-
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
180-
ensureCapacity(8);
176+
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
177+
ensureCapacity(8);
181178

182-
if (value == null) {
183-
setNullBit(pos);
184-
// zero-out the bytes
185-
segment.putLong(cursor, 0L);
186-
setOffsetAndSize(pos, cursor, 0);
187-
} else {
188-
segment.putLong(cursor, value.getMillisecond());
189-
setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
190-
}
191-
192-
cursor += 8;
179+
if (value == null) {
180+
setNullBit(pos);
181+
// zero-out the bytes
182+
segment.putLong(cursor, 0L);
183+
setOffsetAndSize(pos, cursor, 0);
184+
} else {
185+
segment.putLong(cursor, value.getMillisecond());
186+
setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
193187
}
188+
189+
cursor += 8;
194190
}
195191

196192
@Override
197193
public void writeLocalZonedTimestamp(int pos, LocalZonedTimestampData value, int precision) {
198-
if (LocalZonedTimestampData.isCompact(precision)) {
199-
writeLong(pos, value.getEpochMillisecond());
200-
} else {
201-
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
202-
ensureCapacity(8);
194+
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
195+
ensureCapacity(8);
203196

204-
if (value == null) {
205-
setNullBit(pos);
206-
// zero-out the bytes
207-
segment.putLong(cursor, 0L);
208-
setOffsetAndSize(pos, cursor, 0);
209-
} else {
210-
segment.putLong(cursor, value.getEpochMillisecond());
211-
setOffsetAndSize(pos, cursor, value.getEpochNanoOfMillisecond());
212-
}
213-
214-
cursor += 8;
197+
if (value == null) {
198+
setNullBit(pos);
199+
// zero-out the bytes
200+
segment.putLong(cursor, 0L);
201+
setOffsetAndSize(pos, cursor, 0);
202+
} else {
203+
segment.putLong(cursor, value.getEpochMillisecond());
204+
setOffsetAndSize(pos, cursor, value.getEpochNanoOfMillisecond());
215205
}
206+
207+
cursor += 8;
216208
}
217209

218210
@Override

0 commit comments

Comments
 (0)