Skip to content

Commit 51c679a

Browse files
authored
[FLINK-36596][transform] Fix unable to schema evolve with project-less transform rules
This closes #3665.
1 parent 5db16ba commit 51c679a

File tree

2 files changed

+96
-1
lines changed

2 files changed

+96
-1
lines changed

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1326,6 +1326,100 @@ void testPostAsteriskWithSchemaEvolution() throws Exception {
13261326
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[15 -> Oops, 12th, 15, Oops], after=[], op=DELETE, meta=()}");
13271327
}
13281328

1329+
@Test
1330+
void testTransformWithFilterButNoProjection() throws Exception {
1331+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
1332+
1333+
// Setup value source
1334+
Configuration sourceConfig = new Configuration();
1335+
1336+
sourceConfig.set(
1337+
ValuesDataSourceOptions.EVENT_SET_ID,
1338+
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
1339+
1340+
TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1");
1341+
List<Event> events = generateSchemaEvolutionEvents(tableId);
1342+
1343+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
1344+
1345+
SourceDef sourceDef =
1346+
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
1347+
1348+
// Setup value sink
1349+
Configuration sinkConfig = new Configuration();
1350+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
1351+
1352+
// Setup pipeline
1353+
Configuration pipelineConfig = new Configuration();
1354+
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
1355+
pipelineConfig.set(
1356+
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
1357+
PipelineDef pipelineDef =
1358+
new PipelineDef(
1359+
sourceDef,
1360+
sinkDef,
1361+
Collections.emptyList(),
1362+
Collections.singletonList(
1363+
new TransformDef(
1364+
"default_namespace.default_schema.\\.*",
1365+
null,
1366+
"id > 1",
1367+
null,
1368+
null,
1369+
null,
1370+
null)),
1371+
Collections.emptyList(),
1372+
pipelineConfig);
1373+
1374+
// Execute the pipeline
1375+
PipelineExecution execution = composer.compose(pipelineDef);
1376+
execution.execute();
1377+
1378+
// Check the order and content of all received events
1379+
String[] outputEvents = outCaptor.toString().trim().split("\n");
1380+
1381+
assertThat(outputEvents)
1382+
.containsExactly(
1383+
// Initial stage
1384+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
1385+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}",
1386+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}",
1387+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",
1388+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22], after=[], op=DELETE, meta=()}",
1389+
1390+
// Add column stage
1391+
"AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
1392+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}",
1393+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}",
1394+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}",
1395+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}",
1396+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
1397+
1398+
// Alter column type stage
1399+
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
1400+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
1401+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
1402+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}",
1403+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0], op=INSERT, meta=()}",
1404+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0], after=[], op=DELETE, meta=()}",
1405+
1406+
// Rename column stage
1407+
"RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}",
1408+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1], op=INSERT, meta=()}",
1409+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0], op=INSERT, meta=()}",
1410+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0], after=[8th, 11, Kella, 18.0, 0], op=UPDATE, meta=()}",
1411+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0], op=INSERT, meta=()}",
1412+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0], after=[], op=DELETE, meta=()}",
1413+
1414+
// Drop column stage
1415+
"DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}",
1416+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe], op=INSERT, meta=()}",
1417+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko], op=INSERT, meta=()}",
1418+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko], after=[11th, 14, Nein], op=UPDATE, meta=()}",
1419+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops], op=INSERT, meta=()}",
1420+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops], after=[], op=DELETE, meta=()}");
1421+
}
1422+
13291423
@Test
13301424
void testTransformUnmatchedSchemaEvolution() throws Exception {
13311425
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,8 @@ public static String normalizeFilter(String projection, String filter) {
588588

589589
public static boolean hasAsterisk(@Nullable String projection) {
590590
if (isNullOrWhitespaceOnly(projection)) {
591-
return false;
591+
// Providing an empty projection expression is equivalent to writing `*` explicitly.
592+
return true;
592593
}
593594
return parseProjectionExpression(projection).getOperandList().stream()
594595
.anyMatch(TransformParser::hasAsterisk);

0 commit comments

Comments
 (0)