Skip to content

Commit e05ce3f

Browse files
authored
Merge branch 'master' into FLINK-36282-2
2 parents 2228766 + e81df94 commit e05ce3f

File tree

20 files changed

+671
-29
lines changed

20 files changed

+671
-29
lines changed

.github/workflows/flink_cdc_base.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ jobs:
148148
maven-version: 3.8.6
149149

150150
- name: Compile and test
151-
timeout-minutes: 60
151+
timeout-minutes: 90
152152
run: |
153153
set -o pipefail
154154

docs/content/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,19 @@ During a snapshot operation, the connector will query each included table to pro
401401
Schema change events are applied to a "shadow" table and then swapped with the original table later.
402402
<br>
403403
This is an experimental feature, and subject to change in the future.
404-
</td>
404+
</td>
405+
</tr>
406+
<tr>
407+
<td>use.legacy.json.format</td>
408+
<td>optional</td>
409+
<td style="word-wrap: break-word;">true</td>
410+
<td>Boolean</td>
411+
<td>Whether to use legacy JSON format to cast JSON type data in binlog. <br>
412+
It determines whether to use the legacy JSON format when retrieving JSON type data in binlog.
413+
If the user configures 'use.legacy.json.format' = 'true', whitespace before values and after commas in the JSON type data is removed. For example,
414+
JSON type data {"key1": "value1", "key2": "value2"} in binlog would be converted to {"key1":"value1","key2":"value2"}.
415+
When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
416+
</td>
405417
</tr>
406418
</tbody>
407419
</table>

docs/content/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,18 @@ pipeline:
320320
<td>Boolean</td>
321321
<td>Whether treat TINYINT(1) as boolean, by default is true.</td>
322322
</tr>
323+
<tr>
324+
<td>use.legacy.json.format</td>
325+
<td>optional</td>
326+
<td style="word-wrap: break-word;">true</td>
327+
<td>Boolean</td>
328+
<td>Whether to use legacy JSON format to cast JSON type data in binlog. <br>
329+
It determines whether to use the legacy JSON format when retrieving JSON type data in binlog.
330+
If the user configures 'use.legacy.json.format' = 'true', whitespace before values and after commas in the JSON type data is removed. For example,
331+
JSON type data {"key1": "value1", "key2": "value2"} in binlog would be converted to {"key1":"value1","key2":"value2"}.
332+
When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
333+
</td>
334+
</tr>
323335
</tbody>
324336
</table>
325337
</div>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
9595
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
9696
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
97+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USE_LEGACY_JSON_FORMAT;
9798
import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
9899
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
99100
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
@@ -148,6 +149,7 @@ public DataSource createDataSource(Context context) {
148149
boolean scanBinlogNewlyAddedTableEnabled =
149150
config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
150151
boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES);
152+
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
151153

152154
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
153155
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -198,7 +200,8 @@ public DataSource createDataSource(Context context) {
198200
.jdbcProperties(getJdbcProperties(configMap))
199201
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
200202
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
201-
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean);
203+
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
204+
.useLegacyJsonFormat(useLegacyJsonFormat);
202205

203206
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
204207

@@ -332,6 +335,7 @@ public Set<ConfigOption<?>> optionalOptions() {
332335
options.add(METADATA_LIST);
333336
options.add(INCLUDE_COMMENTS_ENABLED);
334337
options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
338+
options.add(USE_LEGACY_JSON_FORMAT);
335339
return options;
336340
}
337341

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,4 +305,12 @@ public class MySqlDataSourceOptions {
305305
.booleanType()
306306
.defaultValue(true)
307307
.withDescription("Whether treat TINYINT(1) as boolean, by default is true. ");
308+
309+
@Experimental
310+
public static final ConfigOption<Boolean> USE_LEGACY_JSON_FORMAT =
311+
ConfigOptions.key("use.legacy.json.format")
312+
.booleanType()
313+
.defaultValue(true)
314+
.withDescription(
315+
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");
308316
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java

Lines changed: 117 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,39 @@ public void testMysql57CommonDataTypes() throws Throwable {
106106
testCommonDataTypes(fullTypesMySql57Database);
107107
}
108108

109+
@Test
110+
public void testMysql57JsonDataTypes() throws Throwable {
111+
// Set `useLegacyJsonFormat` as false, so the json string will have no whitespace
112+
// before value and after comma in json format be formatted with legacy format.
113+
testJsonDataType(fullTypesMySql57Database, false);
114+
}
115+
116+
@Test
117+
public void testMysql57JsonDataTypesWithUseLegacyJsonFormat() throws Throwable {
118+
// Set `useLegacyJsonFormat` as true, so the json string will have whitespace before
119+
// value and after comma in json format be formatted with legacy format.
120+
testJsonDataType(fullTypesMySql57Database, true);
121+
}
122+
109123
@Test
110124
public void testMySql8CommonDataTypes() throws Throwable {
111125
testCommonDataTypes(fullTypesMySql8Database);
112126
}
113127

128+
@Test
129+
public void testMySql8JsonDataTypes() throws Throwable {
130+
// Set `useLegacyJsonFormat` as false, so the json string will have no whitespace
131+
// before value and after comma in json format be formatted with legacy format.
132+
testJsonDataType(fullTypesMySql8Database, false);
133+
}
134+
135+
@Test
136+
public void testMySql8JsonDataTypesWithUseLegacyJsonFormat() throws Throwable {
137+
// Set `useLegacyJsonFormat` as true, so the json string will have whitespace before
138+
// value and after comma in json format be formatted with legacy format.
139+
testJsonDataType(fullTypesMySql8Database, true);
140+
}
141+
114142
@Test
115143
public void testMysql57TimeDataTypes() throws Throwable {
116144
RowType recordType =
@@ -321,9 +349,13 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
321349
};
322350

323351
database.createAndInitialize();
352+
Boolean useLegacyJsonFormat = true;
324353
CloseableIterator<Event> iterator =
325354
env.fromSource(
326-
getFlinkSourceProvider(new String[] {"precision_types"}, database)
355+
getFlinkSourceProvider(
356+
new String[] {"precision_types"},
357+
database,
358+
useLegacyJsonFormat)
327359
.getSource(),
328360
WatermarkStrategy.noWatermarks(),
329361
"Event-Source")
@@ -351,9 +383,15 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
351383

352384
private void testCommonDataTypes(UniqueDatabase database) throws Exception {
353385
database.createAndInitialize();
386+
// Set useLegacyJsonFormat option as true, so the json string will have no whitespace before
387+
// value and after comma in json format.be formatted with legacy format.
388+
Boolean useLegacyJsonFormat = true;
354389
CloseableIterator<Event> iterator =
355390
env.fromSource(
356-
getFlinkSourceProvider(new String[] {"common_types"}, database)
391+
getFlinkSourceProvider(
392+
new String[] {"common_types"},
393+
database,
394+
useLegacyJsonFormat)
357395
.getSource(),
358396
WatermarkStrategy.noWatermarks(),
359397
"Event-Source")
@@ -446,7 +484,7 @@ private void testCommonDataTypes(UniqueDatabase database) throws Exception {
446484
}
447485

448486
expectedSnapshot[30] = null;
449-
// The json string from binlog will remove useless space
487+
// Legacy format removes useless space in json string from binlog
450488
expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
451489
Object[] expectedStreamRecord = expectedSnapshot;
452490

@@ -457,6 +495,66 @@ private void testCommonDataTypes(UniqueDatabase database) throws Exception {
457495
.isEqualTo(expectedStreamRecord);
458496
}
459497

498+
private void testJsonDataType(UniqueDatabase database, Boolean useLegacyJsonFormat)
499+
throws Exception {
500+
database.createAndInitialize();
501+
CloseableIterator<Event> iterator =
502+
env.fromSource(
503+
getFlinkSourceProvider(
504+
new String[] {"json_types"},
505+
database,
506+
useLegacyJsonFormat)
507+
.getSource(),
508+
WatermarkStrategy.noWatermarks(),
509+
"Event-Source")
510+
.executeAndCollect();
511+
512+
Object[] expectedSnapshot =
513+
new Object[] {
514+
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
515+
BinaryStringData.fromString("{\"key1\": \"value1\"}"),
516+
BinaryStringData.fromString("{\"key1\": \"value1\", \"key2\": \"value2\"}"),
517+
BinaryStringData.fromString(
518+
"[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"),
519+
1
520+
};
521+
522+
// skip CreateTableEvent
523+
List<Event> snapshotResults =
524+
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
525+
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
526+
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, JSON_TYPES))
527+
.isEqualTo(expectedSnapshot);
528+
529+
try (Connection connection = database.getJdbcConnection();
530+
Statement statement = connection.createStatement()) {
531+
statement.execute("UPDATE json_types SET int_c = null WHERE id = 1;");
532+
}
533+
534+
Object[] expectedStreamRecord = expectedSnapshot;
535+
List<Event> streamResults =
536+
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
537+
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
538+
539+
expectedSnapshot[4] = null;
540+
541+
if (useLegacyJsonFormat) {
542+
// removed whitespace before value and after comma in json format string value
543+
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
544+
.containsExactly(
545+
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
546+
BinaryStringData.fromString("{\"key1\":\"value1\"}"),
547+
BinaryStringData.fromString(
548+
"{\"key1\":\"value1\",\"key2\":\"value2\"}"),
549+
BinaryStringData.fromString(
550+
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"),
551+
null);
552+
} else {
553+
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
554+
.containsExactly(expectedStreamRecord);
555+
}
556+
}
557+
460558
private Instant toInstant(String ts) {
461559
return Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of("UTC")).toInstant();
462560
}
@@ -468,9 +566,13 @@ private void testTimeDataTypes(
468566
Object[] expectedStreamRecord)
469567
throws Exception {
470568
database.createAndInitialize();
569+
Boolean useLegacyJsonFormat = true;
471570
CloseableIterator<Event> iterator =
472571
env.fromSource(
473-
getFlinkSourceProvider(new String[] {"time_types"}, database)
572+
getFlinkSourceProvider(
573+
new String[] {"time_types"},
574+
database,
575+
useLegacyJsonFormat)
474576
.getSource(),
475577
WatermarkStrategy.noWatermarks(),
476578
"Event-Source")
@@ -498,7 +600,7 @@ private void testTimeDataTypes(
498600
}
499601

500602
private FlinkSourceProvider getFlinkSourceProvider(
501-
String[] captureTables, UniqueDatabase database) {
603+
String[] captureTables, UniqueDatabase database, Boolean useLegacyJsonFormat) {
502604
String[] captureTableIds =
503605
Arrays.stream(captureTables)
504606
.map(tableName -> database.getDatabaseName() + "." + tableName)
@@ -517,7 +619,8 @@ private FlinkSourceProvider getFlinkSourceProvider(
517619
.username(database.getUsername())
518620
.password(database.getPassword())
519621
.serverTimeZone(ZoneId.of("UTC").toString())
520-
.serverId(MySqSourceTestUtils.getServerId(env.getParallelism()));
622+
.serverId(MySqSourceTestUtils.getServerId(env.getParallelism()))
623+
.useLegacyJsonFormat(useLegacyJsonFormat);
521624
return (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
522625
}
523626

@@ -577,4 +680,12 @@ private FlinkSourceProvider getFlinkSourceProvider(
577680
DataTypes.STRING(),
578681
DataTypes.STRING(),
579682
DataTypes.STRING());
683+
684+
private static final RowType JSON_TYPES =
685+
RowType.of(
686+
DataTypes.DECIMAL(20, 0).notNull(),
687+
DataTypes.STRING(),
688+
DataTypes.STRING(),
689+
DataTypes.STRING(),
690+
DataTypes.INT());
580691
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,13 +222,15 @@ public void testMysql57PrecisionTypesSchema() {
222222
fullTypesMySql57Database.createAndInitialize();
223223

224224
String[] tables = new String[] {"precision_types"};
225+
225226
MySqlMetadataAccessor metadataAccessor =
226227
getMetadataAccessor(tables, fullTypesMySql57Database, true);
227228

228229
Schema actualSchema =
229230
metadataAccessor.getTableSchema(
230231
TableId.tableId(
231232
fullTypesMySql57Database.getDatabaseName(), "precision_types"));
233+
232234
Schema expectedSchema =
233235
Schema.newBuilder()
234236
.primaryKey("id")
@@ -304,6 +306,7 @@ public void testMysql8PrecisionTypesSchema() {
304306
metadataAccessor.getTableSchema(
305307
TableId.tableId(
306308
fullTypesMySql8Database.getDatabaseName(), "precision_types"));
309+
307310
Schema expectedSchema =
308311
Schema.newBuilder()
309312
.primaryKey("id")
@@ -370,7 +373,8 @@ public void testMysql8PrecisionTypesSchema() {
370373
private void testAccessDatabaseAndTable(UniqueDatabase database) {
371374
database.createAndInitialize();
372375

373-
String[] tables = new String[] {"common_types", "time_types", "precision_types"};
376+
String[] tables =
377+
new String[] {"common_types", "time_types", "precision_types", "json_types"};
374378
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database, true);
375379

376380
assertThatThrownBy(metadataAccessor::listNamespaces)
@@ -528,6 +532,66 @@ private MySqlMetadataAccessor getMetadataAccessor(
528532
return new MySqlMetadataAccessor(sourceConfig);
529533
}
530534

535+
@Test
536+
public void testMysql57AccessJsonTypesSchema() {
537+
fullTypesMySql57Database.createAndInitialize();
538+
539+
String[] tables = new String[] {"json_types"};
540+
MySqlMetadataAccessor metadataAccessor =
541+
getMetadataAccessor(tables, fullTypesMySql57Database, true);
542+
543+
Schema actualSchema =
544+
metadataAccessor.getTableSchema(
545+
TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "json_types"));
546+
Schema expectedSchema =
547+
Schema.newBuilder()
548+
.primaryKey("id")
549+
.fromRowDataType(
550+
RowType.of(
551+
new DataType[] {
552+
DataTypes.DECIMAL(20, 0).notNull(),
553+
DataTypes.STRING(),
554+
DataTypes.STRING(),
555+
DataTypes.STRING(),
556+
DataTypes.INT()
557+
},
558+
new String[] {
559+
"id", "json_c0", "json_c1", "json_c2", "int_c",
560+
}))
561+
.build();
562+
assertThat(actualSchema).isEqualTo(expectedSchema);
563+
}
564+
565+
@Test
566+
public void testMysql8AccessJsonTypesSchema() {
567+
fullTypesMySql57Database.createAndInitialize();
568+
569+
String[] tables = new String[] {"json_types"};
570+
MySqlMetadataAccessor metadataAccessor =
571+
getMetadataAccessor(tables, fullTypesMySql57Database, true);
572+
573+
Schema actualSchema =
574+
metadataAccessor.getTableSchema(
575+
TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "json_types"));
576+
Schema expectedSchema =
577+
Schema.newBuilder()
578+
.primaryKey("id")
579+
.fromRowDataType(
580+
RowType.of(
581+
new DataType[] {
582+
DataTypes.DECIMAL(20, 0).notNull(),
583+
DataTypes.STRING(),
584+
DataTypes.STRING(),
585+
DataTypes.STRING(),
586+
DataTypes.INT()
587+
},
588+
new String[] {
589+
"id", "json_c0", "json_c1", "json_c2", "int_c",
590+
}))
591+
.build();
592+
assertThat(actualSchema).isEqualTo(expectedSchema);
593+
}
594+
531595
private MySqlSourceConfig getConfig(
532596
String[] captureTables, UniqueDatabase database, boolean tinyint1IsBit) {
533597
String[] captureTableIds =

0 commit comments

Comments
 (0)