Skip to content

Commit cc14a3e

Browse files
committed
Rename __op_type__ to more generic __data_event_type__
1 parent 5d68de2 commit cc14a3e

File tree

6 files changed

+28
-27
lines changed

6 files changed

+28
-27
lines changed

docs/content.zh/docs/core-concept/transform.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ Multiple rules can be declared in one single pipeline YAML file.
4747
## Fields definition
4848
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.
4949

50-
| Field | Data Type | Description |
51-
|--------------------|-----------|----------------------------------------------|
52-
| __namespace_name__ | String | Name of the namespace that contains the row. |
53-
| __schema_name__ | String | Name of the schema that contains the row. |
54-
| __table_name__ | String | Name of the table that contains the row. |
55-
| __row_kind__ | String | Operation type of data change event. |
50+
| Field | Data Type | Description |
51+
|---------------------|-----------|----------------------------------------------|
52+
| __namespace_name__ | String | Name of the namespace that contains the row. |
53+
| __schema_name__ | String | Name of the schema that contains the row. |
54+
| __table_name__ | String | Name of the table that contains the row. |
55+
| __data_event_type__ | String | Operation type of data change event. |
5656

5757
## Metadata relationship
5858

docs/content/docs/core-concept/transform.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ Multiple rules can be declared in one single pipeline YAML file.
4747
## Fields definition
4848
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.
4949

50-
| Field | Data Type | Description |
51-
|--------------------|-----------|----------------------------------------------|
52-
| __namespace_name__ | String | Name of the namespace that contains the row. |
53-
| __schema_name__ | String | Name of the schema that contains the row. |
54-
| __table_name__ | String | Name of the table that contains the row. |
55-
| __row_kind__ | String | Operation type of data change event. |
50+
| Field | Data Type | Description |
51+
|---------------------|-----------|----------------------------------------------|
52+
| __namespace_name__ | String | Name of the namespace that contains the row. |
53+
| __schema_name__ | String | Name of the schema that contains the row. |
54+
| __table_name__ | String | Name of the table that contains the row. |
55+
| __data_event_type__ | String | Operation type of data change event. |
5656

5757
## Metadata relationship
5858

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
358358
TransformDef transformDef =
359359
new TransformDef(
360360
"default_namespace.default_schema.table1",
361-
"*,concat(col1,'0') as col12,__row_kind__ as rk",
361+
"*,concat(col1,'0') as col12,__data_event_type__ as rk",
362362
"col1 <> '3'",
363363
"col1",
364364
"col12",
@@ -384,7 +384,7 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
384384
String[] outputEvents = outCaptor.toString().trim().split("\n");
385385
assertThat(outputEvents)
386386
.containsExactly(
387-
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`__row_kind__` STRING NOT NULL,`col12` STRING,`rk` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
387+
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`__data_event_type__` STRING NOT NULL,`col12` STRING,`rk` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
388388
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, +I, 10, +I], op=INSERT, meta=()}",
389389
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, +I, 20, +I], op=INSERT, meta=()}",
390390
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private Object[] generateParams(BinaryRecordData data, long epochTime, String op
9090
params.add(tableInfo.getTableName());
9191
continue;
9292
}
93-
if (originalColumnName.equals(TransformParser.DEFAULT_ROW_KIND)) {
93+
if (originalColumnName.equals(TransformParser.DEFAULT_DATA_EVENT_TYPE)) {
9494
params.add(opType);
9595
continue;
9696
}
@@ -143,9 +143,9 @@ private TransformExpressionKey generateTransformExpressionKey() {
143143
paramTypes.add(String.class);
144144
}
145145

146-
if (scriptExpression.contains(TransformParser.DEFAULT_ROW_KIND)
147-
&& !argumentNames.contains(TransformParser.DEFAULT_ROW_KIND)) {
148-
argumentNames.add(TransformParser.DEFAULT_ROW_KIND);
146+
if (scriptExpression.contains(TransformParser.DEFAULT_DATA_EVENT_TYPE)
147+
&& !argumentNames.contains(TransformParser.DEFAULT_DATA_EVENT_TYPE)) {
148+
argumentNames.add(TransformParser.DEFAULT_DATA_EVENT_TYPE);
149149
paramTypes.add(String.class);
150150
}
151151

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ private Object[] generateParams(BinaryRecordData data, long epochTime, String op
8585
params.add(tableInfo.getTableName());
8686
continue;
8787
}
88-
if (columnName.equals(TransformParser.DEFAULT_ROW_KIND)) {
88+
if (columnName.equals(TransformParser.DEFAULT_DATA_EVENT_TYPE)) {
8989
params.add(opType);
9090
continue;
9191
}
@@ -138,9 +138,9 @@ private TransformExpressionKey generateTransformExpressionKey() {
138138
paramTypes.add(String.class);
139139
}
140140

141-
if (scriptExpression.contains(TransformParser.DEFAULT_ROW_KIND)
142-
&& !argumentNames.contains(TransformParser.DEFAULT_ROW_KIND)) {
143-
argumentNames.add(TransformParser.DEFAULT_ROW_KIND);
141+
if (scriptExpression.contains(TransformParser.DEFAULT_DATA_EVENT_TYPE)
142+
&& !argumentNames.contains(TransformParser.DEFAULT_DATA_EVENT_TYPE)) {
143+
argumentNames.add(TransformParser.DEFAULT_DATA_EVENT_TYPE);
144144
paramTypes.add(String.class);
145145
}
146146

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public class TransformParser {
7777
public static final String DEFAULT_NAMESPACE_NAME = "__namespace_name__";
7878
public static final String DEFAULT_SCHEMA_NAME = "__schema_name__";
7979
public static final String DEFAULT_TABLE_NAME = "__table_name__";
80-
public static final String DEFAULT_ROW_KIND = "__row_kind__";
80+
public static final String DEFAULT_DATA_EVENT_TYPE = "__data_event_type__";
8181

8282
private static SqlParser getCalciteParser(String sql) {
8383
return SqlParser.create(
@@ -368,9 +368,10 @@ private static List<Column> copyFillMetadataColumn(
368368
&& !containsMetadataColumn(columnsWithMetadata, DEFAULT_TABLE_NAME)) {
369369
columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME, DataTypes.STRING()));
370370
}
371-
if (transformStatement.contains(DEFAULT_ROW_KIND)
372-
&& !containsMetadataColumn(columnsWithMetadata, DEFAULT_ROW_KIND)) {
373-
columnsWithMetadata.add(Column.physicalColumn(DEFAULT_ROW_KIND, DataTypes.STRING()));
371+
if (transformStatement.contains(DEFAULT_DATA_EVENT_TYPE)
372+
&& !containsMetadataColumn(columnsWithMetadata, DEFAULT_DATA_EVENT_TYPE)) {
373+
columnsWithMetadata.add(
374+
Column.physicalColumn(DEFAULT_DATA_EVENT_TYPE, DataTypes.STRING()));
374375
}
375376
return columnsWithMetadata;
376377
}
@@ -383,7 +384,7 @@ private static boolean isMetadataColumn(String columnName) {
383384
return DEFAULT_TABLE_NAME.equals(columnName)
384385
|| DEFAULT_SCHEMA_NAME.equals(columnName)
385386
|| DEFAULT_NAMESPACE_NAME.equals(columnName)
386-
|| DEFAULT_ROW_KIND.equals(columnName);
387+
|| DEFAULT_DATA_EVENT_TYPE.equals(columnName);
387388
}
388389

389390
public static SqlSelect parseFilterExpression(String filterExpression) {

0 commit comments

Comments
 (0)