Skip to content

Commit eca48f8

Browse files
committed
[FLINK-35805][transform] Add __data_event_type__ metadata column
1 parent 874ff4f commit eca48f8

File tree

10 files changed

+186
-78
lines changed

10 files changed

+186
-78
lines changed

docs/content.zh/docs/core-concept/transform.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,11 @@ Multiple rules can be declared in one single pipeline YAML file.
4848
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.
4949

5050
| Field | Data Type | Description |
51-
|--------------------|-----------|----------------------------------------------|
52-
| __namespace_name__ | String | Name of the namespace that contains the row. |
53-
| __schema_name__ | String | Name of the schema that contains the row. |
54-
| __table_name__ | String | Name of the table that contains the row. |
51+
|---------------------|-----------|----------------------------------------------|
52+
| __namespace_name__ | String | Name of the namespace that contains the row. |
53+
| __schema_name__ | String | Name of the schema that contains the row. |
54+
| __table_name__ | String | Name of the table that contains the row. |
55+
| __data_event_type__ | String | Operation type of data change event. |
5556

5657
## Metadata relationship
5758

docs/content/docs/core-concept/transform.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,11 @@ Multiple rules can be declared in one single pipeline YAML file.
4848
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.
4949

5050
| Field | Data Type | Description |
51-
|--------------------|-----------|----------------------------------------------|
52-
| __namespace_name__ | String | Name of the namespace that contains the row. |
53-
| __schema_name__ | String | Name of the schema that contains the row. |
54-
| __table_name__ | String | Name of the table that contains the row. |
51+
|---------------------|-----------|----------------------------------------------|
52+
| __namespace_name__ | String | Name of the namespace that contains the row. |
53+
| __schema_name__ | String | Name of the schema that contains the row. |
54+
| __table_name__ | String | Name of the table that contains the row. |
55+
| __data_event_type__ | String | Operation type of data change event. |
5556

5657
## Metadata relationship
5758

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,66 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
340340
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}");
341341
}
342342

343+
@ParameterizedTest
344+
@EnumSource
345+
void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
346+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
347+
348+
// Setup value source
349+
Configuration sourceConfig = new Configuration();
350+
sourceConfig.set(
351+
ValuesDataSourceOptions.EVENT_SET_ID,
352+
ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
353+
SourceDef sourceDef =
354+
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
355+
356+
// Setup value sink
357+
Configuration sinkConfig = new Configuration();
358+
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
359+
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
360+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
361+
362+
// Setup transform
363+
TransformDef transformDef =
364+
new TransformDef(
365+
"default_namespace.default_schema.table1",
366+
"*,concat(col1,'0') as col12,__data_event_type__ as rk",
367+
"col1 <> '3'",
368+
"col1",
369+
"col12",
370+
"key1=value1",
371+
"");
372+
373+
// Setup pipeline
374+
Configuration pipelineConfig = new Configuration();
375+
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
376+
PipelineDef pipelineDef =
377+
new PipelineDef(
378+
sourceDef,
379+
sinkDef,
380+
Collections.emptyList(),
381+
new ArrayList<>(Collections.singletonList(transformDef)),
382+
Collections.emptyList(),
383+
pipelineConfig);
384+
385+
// Execute the pipeline
386+
PipelineExecution execution = composer.compose(pipelineDef);
387+
execution.execute();
388+
389+
// Check the order and content of all received events
390+
String[] outputEvents = outCaptor.toString().trim().split("\n");
391+
assertThat(outputEvents)
392+
.containsExactly(
393+
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
394+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I], op=INSERT, meta=()}",
395+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I], op=INSERT, meta=()}",
396+
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
397+
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
398+
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
399+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10, -D], after=[], op=DELETE, meta=()}",
400+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U], after=[2, x, 20, +U], op=UPDATE, meta=()}");
401+
}
402+
343403
@ParameterizedTest
344404
@EnumSource
345405
void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -442,9 +442,9 @@ public void testWildcardWithMetadataColumnTransform() throws Exception {
442442
+ " type: values\n"
443443
+ "transform:\n"
444444
+ " - source-table: %s.TABLEALPHA\n"
445-
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n"
445+
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n"
446446
+ " - source-table: %s.TABLEBETA\n"
447-
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n"
447+
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n"
448448
+ "pipeline:\n"
449449
+ " parallelism: 1",
450450
INTER_CONTAINER_MYSQL_ALIAS,
@@ -462,25 +462,25 @@ public void testWildcardWithMetadataColumnTransform() throws Exception {
462462

463463
waitUntilSpecificEvent(
464464
String.format(
465-
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING}, primaryKeys=ID, options=()}",
465+
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING,`type` STRING}, primaryKeys=ID, options=()}",
466466
transformTestDatabase.getDatabaseName()),
467467
60000L);
468468

469469
waitUntilSpecificEvent(
470470
String.format(
471-
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING}, primaryKeys=ID, options=()}",
471+
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING,`type` STRING}, primaryKeys=ID, options=()}",
472472
transformTestDatabase.getDatabaseName()),
473473
60000L);
474474

475475
validateEvents(
476-
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA], op=INSERT, meta=()}",
477-
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA], op=INSERT, meta=()}",
478-
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA], op=INSERT, meta=()}",
479-
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA], op=INSERT, meta=()}",
480-
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 22, Fred, null.%s.TABLEBETA], op=INSERT, meta=()}",
481-
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA], op=INSERT, meta=()}",
482-
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 24, Henry, null.%s.TABLEBETA], op=INSERT, meta=()}",
483-
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 23, Gus, null.%s.TABLEBETA], op=INSERT, meta=()}");
476+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
477+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
478+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
479+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
480+
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 22, Fred, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
481+
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
482+
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 24, Henry, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
483+
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 23, Gus, null.%s.TABLEBETA, +I], op=INSERT, meta=()}");
484484

485485
// generate binlogs
486486
String mysqlJdbcUrl =
@@ -492,9 +492,9 @@ public void testWildcardWithMetadataColumnTransform() throws Exception {
492492
insertBinlogEvents(mysqlJdbcUrl);
493493

494494
validateEvents(
495-
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA], after=[1009, 100, 0, 18, Bob, null.%s.TABLEALPHA], op=UPDATE, meta=()}",
496-
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA], op=INSERT, meta=()}",
497-
"DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA], after=[], op=DELETE, meta=()}");
495+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, -U], after=[1009, 100, 0, 18, Bob, null.%s.TABLEALPHA, +U], op=UPDATE, meta=()}",
496+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
497+
"DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, -D], after=[], op=DELETE, meta=()}");
498498
}
499499

500500
private static void insertBinlogEvents(String mysqlJdbcUrl) throws SQLException {

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.cdc.common.event.CreateTableEvent;
2424
import org.apache.flink.cdc.common.event.DataChangeEvent;
2525
import org.apache.flink.cdc.common.event.Event;
26+
import org.apache.flink.cdc.common.event.OperationType;
2627
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2728
import org.apache.flink.cdc.common.event.TableId;
2829
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
@@ -389,13 +390,15 @@ private Optional<DataChangeEvent> processFilter(
389390
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
390391
// insert and update event only process afterData, delete only process beforeData
391392
if (after != null) {
392-
if (transformFilterProcessor.process(after, epochTime)) {
393+
if (transformFilterProcessor.process(
394+
after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+'))) {
393395
return Optional.of(dataChangeEvent);
394396
} else {
395397
return Optional.empty();
396398
}
397399
} else if (before != null) {
398-
if (transformFilterProcessor.process(before, epochTime)) {
400+
if (transformFilterProcessor.process(
401+
before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-'))) {
399402
return Optional.of(dataChangeEvent);
400403
} else {
401404
return Optional.empty();
@@ -412,11 +415,14 @@ private Optional<DataChangeEvent> processProjection(
412415
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
413416
if (before != null) {
414417
BinaryRecordData projectedBefore =
415-
postTransformProcessor.processData(before, epochTime);
418+
postTransformProcessor.processData(
419+
before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-'));
416420
dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
417421
}
418422
if (after != null) {
419-
BinaryRecordData projectedAfter = postTransformProcessor.processData(after, epochTime);
423+
BinaryRecordData projectedAfter =
424+
postTransformProcessor.processData(
425+
after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+'));
420426
dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter);
421427
}
422428
return Optional.of(dataChangeEvent);
@@ -499,4 +505,8 @@ private void destroyUdf() {
499505
}
500506
});
501507
}
508+
509+
private String opTypeToRowKind(OperationType opType, char beforeOrAfter) {
510+
return String.format("%c%c", beforeOrAfter, opType.name().charAt(0));
511+
}
502512
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
2222
import org.apache.flink.cdc.common.schema.Column;
2323
import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
24-
import org.apache.flink.cdc.runtime.parser.TransformParser;
24+
import org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns;
2525
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
2626

2727
import org.codehaus.janino.ExpressionEvaluator;
@@ -33,6 +33,8 @@
3333
import java.util.LinkedHashSet;
3434
import java.util.List;
3535

36+
import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS;
37+
3638
/**
3739
* The processor of the projection column. It processes the data column and the user-defined
3840
* computed columns.
@@ -79,9 +81,9 @@ public ProjectionColumn getProjectionColumn() {
7981
return projectionColumn;
8082
}
8183

82-
public Object evaluate(BinaryRecordData after, long epochTime) {
84+
public Object evaluate(BinaryRecordData record, long epochTime, String opType) {
8385
try {
84-
return expressionEvaluator.evaluate(generateParams(after, epochTime));
86+
return expressionEvaluator.evaluate(generateParams(record, epochTime, opType));
8587
} catch (InvocationTargetException e) {
8688
LOG.error(
8789
"Table:{} column:{} projection:{} execute failed. {}",
@@ -93,7 +95,7 @@ public Object evaluate(BinaryRecordData after, long epochTime) {
9395
}
9496
}
9597

96-
private Object[] generateParams(BinaryRecordData after, long epochTime) {
98+
private Object[] generateParams(BinaryRecordData record, long epochTime, String opType) {
9799
List<Object> params = new ArrayList<>();
98100
List<Column> columns = tableInfo.getPreTransformedSchema().getColumns();
99101

@@ -103,15 +105,18 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) {
103105
new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
104106
for (String originalColumnName : originalColumnNames) {
105107
switch (originalColumnName) {
106-
case TransformParser.DEFAULT_NAMESPACE_NAME:
108+
case MetadataColumns.DEFAULT_NAMESPACE_NAME:
107109
params.add(tableInfo.getNamespace());
108110
continue;
109-
case TransformParser.DEFAULT_SCHEMA_NAME:
111+
case MetadataColumns.DEFAULT_SCHEMA_NAME:
110112
params.add(tableInfo.getSchemaName());
111113
continue;
112-
case TransformParser.DEFAULT_TABLE_NAME:
114+
case MetadataColumns.DEFAULT_TABLE_NAME:
113115
params.add(tableInfo.getTableName());
114116
continue;
117+
case MetadataColumns.DEFAULT_DATA_EVENT_TYPE:
118+
params.add(opType);
119+
continue;
115120
}
116121

117122
boolean argumentFound = false;
@@ -120,7 +125,7 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) {
120125
if (column.getName().equals(originalColumnName)) {
121126
params.add(
122127
DataTypeConverter.convertToOriginal(
123-
fieldGetters[i].getFieldOrNull(after), column.getType()));
128+
fieldGetters[i].getFieldOrNull(record), column.getType()));
124129
argumentFound = true;
125130
break;
126131
}
@@ -158,20 +163,14 @@ private TransformExpressionKey generateTransformExpressionKey() {
158163
}
159164

160165
for (String originalColumnName : originalColumnNames) {
161-
switch (originalColumnName) {
162-
case TransformParser.DEFAULT_NAMESPACE_NAME:
163-
argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME);
164-
paramTypes.add(String.class);
165-
break;
166-
case TransformParser.DEFAULT_SCHEMA_NAME:
167-
argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME);
168-
paramTypes.add(String.class);
169-
break;
170-
case TransformParser.DEFAULT_TABLE_NAME:
171-
argumentNames.add(TransformParser.DEFAULT_TABLE_NAME);
172-
paramTypes.add(String.class);
173-
break;
174-
}
166+
METADATA_COLUMNS.stream()
167+
.filter(col -> col.f0.equals(originalColumnName))
168+
.findFirst()
169+
.ifPresent(
170+
col -> {
171+
argumentNames.add(col.f0);
172+
paramTypes.add(col.f2);
173+
});
175174
}
176175

177176
argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE);

0 commit comments

Comments
 (0)