Skip to content

Commit fc752fb

Browse files
authored
[FLINK-36184][transform] Fix transform operator swallows schema changes from tables not present in transform rules (apache#3591)
1 parent 6ef4734 commit fc752fb

File tree

3 files changed

+129
-20
lines changed

3 files changed

+129
-20
lines changed

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,94 @@ public void testUnexpectedBehavior() {
242242
() -> submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar));
243243
}
244244

245+
@Test
246+
public void testByDefaultTransform() throws Exception {
247+
String dbName = schemaEvolveDatabase.getDatabaseName();
248+
249+
// We put a dummy transform block that matches nothing
250+
// to ensure TransformOperator exists, so we could verify if TransformOperator could
251+
// correctly handle such "bypass" tables with schema changes.
252+
String pipelineJob =
253+
String.format(
254+
"source:\n"
255+
+ " type: mysql\n"
256+
+ " hostname: %s\n"
257+
+ " port: 3306\n"
258+
+ " username: %s\n"
259+
+ " password: %s\n"
260+
+ " tables: %s.members\n"
261+
+ " server-id: 5400-5404\n"
262+
+ " server-time-zone: UTC\n"
263+
+ "\n"
264+
+ "sink:\n"
265+
+ " type: values\n"
266+
+ "transform:\n"
267+
+ " - source-table: another.irrelevant\n"
268+
+ " projection: \"'irrelevant' AS tag\"\n"
269+
+ "\n"
270+
+ "pipeline:\n"
271+
+ " schema.change.behavior: evolve\n"
272+
+ " parallelism: %d",
273+
INTER_CONTAINER_MYSQL_ALIAS,
274+
MYSQL_TEST_USER,
275+
MYSQL_TEST_PASSWORD,
276+
dbName,
277+
parallelism);
278+
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
279+
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
280+
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
281+
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
282+
waitUntilJobRunning(Duration.ofSeconds(30));
283+
LOG.info("Pipeline job is running");
284+
validateSnapshotData(dbName, "members");
285+
286+
LOG.info("Starting schema evolution");
287+
String mysqlJdbcUrl =
288+
String.format(
289+
"jdbc:mysql://%s:%s/%s", MYSQL.getHost(), MYSQL.getDatabasePort(), dbName);
290+
291+
try (Connection conn =
292+
DriverManager.getConnection(
293+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
294+
Statement stmt = conn.createStatement()) {
295+
296+
waitForIncrementalStage(dbName, "members", stmt);
297+
298+
// triggers AddColumnEvent
299+
stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;");
300+
stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);");
301+
302+
// triggers AlterColumnTypeEvent and RenameColumnEvent
303+
stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;");
304+
305+
// triggers RenameColumnEvent
306+
stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;");
307+
308+
// triggers DropColumnEvent
309+
stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
310+
stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
311+
stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");
312+
}
313+
314+
List<String> expectedTaskManagerEvents =
315+
Arrays.asList(
316+
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
317+
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
318+
"AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}",
319+
"RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}",
320+
"RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}",
321+
"DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}",
322+
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0], op=INSERT, meta=()}",
323+
"DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, 17.0], op=INSERT, meta=()}");
324+
325+
List<String> expectedTmEvents =
326+
expectedTaskManagerEvents.stream()
327+
.map(s -> String.format(s, dbName, dbName))
328+
.collect(Collectors.toList());
329+
330+
validateResult(expectedTmEvents, taskManagerConsumer);
331+
}
332+
245333
private void testGenericSchemaEvolution(
246334
String behavior,
247335
boolean mergeTable,

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -258,17 +258,29 @@ private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) throws
258258
.stream())
259259
.map(ProjectionColumn::getColumnName)
260260
.collect(Collectors.toSet());
261-
boolean hasAsterisk =
262-
transforms.stream()
263-
.filter(t -> t.getSelectors().isMatch(tableId))
264-
.anyMatch(
265-
t ->
266-
TransformParser.hasAsterisk(
267-
t.getProjection()
268-
.map(TransformProjection::getProjection)
269-
.orElse(null)));
270261

271-
hasAsteriskMap.put(tableId, hasAsterisk);
262+
boolean notTransformed =
263+
transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));
264+
265+
if (notTransformed) {
266+
// If this TableId isn't presented in any transform block, it should behave like a
267+
// "*" projection and should be regarded as asterisk-ful.
268+
hasAsteriskMap.put(tableId, true);
269+
} else {
270+
boolean hasAsterisk =
271+
transforms.stream()
272+
.filter(t -> t.getSelectors().isMatch(tableId))
273+
.anyMatch(
274+
t ->
275+
TransformParser.hasAsterisk(
276+
t.getProjection()
277+
.map(
278+
TransformProjection
279+
::getProjection)
280+
.orElse(null)));
281+
282+
hasAsteriskMap.put(tableId, hasAsterisk);
283+
}
272284
projectedColumnsMap.put(
273285
tableId,
274286
createTableEvent.getSchema().getColumnNames().stream()

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -293,17 +293,26 @@ private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
293293
.map(Column::getName)
294294
.collect(Collectors.toSet());
295295

296-
boolean hasAsterisk =
297-
transforms.stream()
298-
.filter(t -> t.getSelectors().isMatch(tableId))
299-
.anyMatch(
300-
t ->
301-
TransformParser.hasAsterisk(
302-
t.getProjection()
303-
.map(TransformProjection::getProjection)
304-
.orElse(null)));
296+
boolean notTransformed =
297+
transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));
305298

306-
hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
299+
if (notTransformed) {
300+
// If this TableId isn't presented in any transform block, it should behave like a "*"
301+
// projection and should be regarded as asterisk-ful.
302+
hasAsteriskMap.put(tableId, true);
303+
} else {
304+
boolean hasAsterisk =
305+
transforms.stream()
306+
.filter(t -> t.getSelectors().isMatch(tableId))
307+
.anyMatch(
308+
t ->
309+
TransformParser.hasAsterisk(
310+
t.getProjection()
311+
.map(TransformProjection::getProjection)
312+
.orElse(null)));
313+
314+
hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
315+
}
307316
referencedColumnsMap.put(
308317
createTableEvent.tableId(),
309318
createTableEvent.getSchema().getColumnNames().stream()

0 commit comments

Comments
 (0)