Skip to content

Commit e939f3b

Browse files
committed
fix review
1 parent 05cefbf commit e939f3b

File tree

14 files changed

+99
-61
lines changed

14 files changed

+99
-61
lines changed

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
7878
private static final String TRANSFORM_PROJECTION_KEY = "projection";
7979
private static final String TRANSFORM_FILTER_KEY = "filter";
8080
private static final String TRANSFORM_DESCRIPTION_KEY = "description";
81-
private static final String TRANSFORM_CONVERTOR_AFTER_TRANSFORM_KEY =
82-
"convertor-after-transform";
81+
private static final String TRANSFORM_POST_TRANSFORM_CONVERTER_KEY = "post-transform-converter";
8382

8483
// UDF related keys
8584
private static final String UDF_KEY = "user-defined-function";
@@ -318,8 +317,8 @@ private TransformDef toTransformDef(JsonNode transformNode) {
318317
Optional.ofNullable(transformNode.get(TRANSFORM_DESCRIPTION_KEY))
319318
.map(JsonNode::asText)
320319
.orElse(null);
321-
String convertorAfterTransform =
322-
Optional.ofNullable(transformNode.get(TRANSFORM_CONVERTOR_AFTER_TRANSFORM_KEY))
320+
String postTransformConverter =
321+
Optional.ofNullable(transformNode.get(TRANSFORM_POST_TRANSFORM_CONVERTER_KEY))
323322
.map(JsonNode::asText)
324323
.orElse(null);
325324

@@ -331,7 +330,7 @@ private TransformDef toTransformDef(JsonNode transformNode) {
331330
partitionKeys,
332331
tableOptions,
333332
description,
334-
convertorAfterTransform);
333+
postTransformConverter);
335334
}
336335

337336
private Configuration toPipelineConfig(JsonNode pipelineConfigNode) {

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ void testParsingFullDefinitionFromString() throws Exception {
404404
+ " partition-keys: product_name\n"
405405
+ " table-options: comment=app order\n"
406406
+ " description: project fields from source table\n"
407-
+ " convertor-after-transform: SOFT_DELETE\n"
407+
+ " post-transform-converter: SOFT_DELETE\n"
408408
+ " - source-table: mydb.web_order_.*\n"
409409
+ " projection: CONCAT(id, order_id) as uniq_id, *\n"
410410
+ " filter: uniq_id > 10\n"

flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ transform:
4949
partition-keys: product_name
5050
table-options: comment=app order
5151
description: project fields from source table
52-
convertor-after-transform: SOFT_DELETE
52+
converter-after-transform: SOFT_DELETE
5353
- source-table: mydb.web_order_.*
5454
projection: CONCAT(id, order_id) as uniq_id, *
5555
filter: uniq_id > 10

flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ transform:
4747
partition-keys: product_name
4848
table-options: comment=app order
4949
description: project fields from source table
50-
convertor-after-transform: SOFT_DELETE
50+
converter-after-transform: SOFT_DELETE
5151
- source-table: mydb.web_order_.*
5252
projection: CONCAT(id, order_id) as uniq_id, *
5353
filter: uniq_id > 10

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/convertor/TransformConvertor.java renamed to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/transform/converter/PostTransformConverter.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,47 +15,48 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.cdc.runtime.operators.transform.convertor;
18+
package org.apache.flink.cdc.common.transform.converter;
1919

20+
import org.apache.flink.cdc.common.annotation.Experimental;
2021
import org.apache.flink.cdc.common.event.DataChangeEvent;
2122
import org.apache.flink.cdc.common.utils.StringUtils;
22-
import org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator;
23-
import org.apache.flink.cdc.runtime.operators.transform.TransformRule;
2423

2524
import java.io.Serializable;
2625
import java.util.Optional;
2726

2827
/**
29-
* The TransformConvertor applies to convert the {@link DataChangeEvent} after other part of {@link
30-
* TransformRule} in {@link PostTransformOperator}.
28+
* The PostTransformConverter applies to convert the {@link DataChangeEvent} after other part of
29+
* TransformRule in PostTransformOperator.
3130
*/
32-
public interface TransformConvertor extends Serializable {
33-
String SOFT_DELETE_CONVERTOR = "SOFT_DELETE";
31+
@Experimental
32+
public interface PostTransformConverter extends Serializable {
3433

3534
Optional<DataChangeEvent> convert(DataChangeEvent dataChangeEvent);
3635

37-
static Optional<TransformConvertor> of(String classPath) {
36+
static Optional<PostTransformConverter> of(String classPath) {
3837
if (StringUtils.isNullOrWhitespaceOnly(classPath)) {
3938
return Optional.empty();
4039
}
4140

42-
if (SOFT_DELETE_CONVERTOR.equals(classPath)) {
43-
return Optional.of(new SoftDeleteConvertor());
41+
Optional<PostTransformConverter> postTransformConverter =
42+
PostTransformConverters.getInternalPostTransformConverter(classPath);
43+
if (postTransformConverter.isPresent()) {
44+
return postTransformConverter;
4445
}
4546

4647
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
4748
try {
48-
Class<?> convertorClass = classLoader.loadClass(classPath);
49-
Object convertor = convertorClass.newInstance();
50-
if (convertor instanceof TransformConvertor) {
51-
return Optional.of((TransformConvertor) convertor);
49+
Class<?> converterClass = classLoader.loadClass(classPath);
50+
Object converter = converterClass.newInstance();
51+
if (converter instanceof PostTransformConverter) {
52+
return Optional.of((PostTransformConverter) converter);
5253
}
5354
throw new IllegalArgumentException(
5455
String.format(
5556
"%s is not an instance of %s",
56-
classPath, TransformConvertor.class.getName()));
57+
classPath, PostTransformConverter.class.getName()));
5758
} catch (Exception e) {
58-
throw new RuntimeException("Create transform convertor failed.", e);
59+
throw new RuntimeException("Create post transform converter failed.", e);
5960
}
6061
}
6162
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.transform.converter;
19+
20+
import org.apache.flink.cdc.common.annotation.Experimental;
21+
22+
import java.util.Optional;
23+
24+
/** The {@link PostTransformConverter} utils. */
25+
@Experimental
26+
public class PostTransformConverters {
27+
public static final String SOFT_DELETE_CONVERTER = "SOFT_DELETE";
28+
29+
public static Optional<PostTransformConverter> getInternalPostTransformConverter(
30+
String identifier) {
31+
switch (identifier) {
32+
case SOFT_DELETE_CONVERTER:
33+
return Optional.of(new SoftDeleteConverter());
34+
default:
35+
return Optional.empty();
36+
}
37+
}
38+
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/convertor/SoftDeleteConvertor.java renamed to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/transform/converter/SoftDeleteConverter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.cdc.runtime.operators.transform.convertor;
18+
package org.apache.flink.cdc.common.transform.converter;
1919

2020
import org.apache.flink.cdc.common.event.DataChangeEvent;
2121

2222
import java.util.Optional;
2323

2424
import static org.apache.flink.cdc.common.event.OperationType.DELETE;
2525

26-
/** This {@link TransformConvertor} convert delete events to insert events. */
27-
public class SoftDeleteConvertor implements TransformConvertor {
26+
/** This {@link PostTransformConverter} convert delete events to insert events. */
27+
public class SoftDeleteConverter implements PostTransformConverter {
2828

2929
@Override
3030
public Optional<DataChangeEvent> convert(DataChangeEvent dataChangeEvent) {

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class TransformDef {
5050
private final String primaryKeys;
5151
private final String partitionKeys;
5252
private final String tableOptions;
53-
private final String convertorAfterTransform;
53+
private final String postTransformConverter;
5454

5555
public TransformDef(
5656
String sourceTable,
@@ -60,15 +60,15 @@ public TransformDef(
6060
String partitionKeys,
6161
String tableOptions,
6262
String description,
63-
String convertorAfterTransform) {
63+
String postTransformConverter) {
6464
this.sourceTable = sourceTable;
6565
this.projection = projection;
6666
this.filter = filter;
6767
this.primaryKeys = primaryKeys;
6868
this.partitionKeys = partitionKeys;
6969
this.tableOptions = tableOptions;
7070
this.description = description;
71-
this.convertorAfterTransform = convertorAfterTransform;
71+
this.postTransformConverter = postTransformConverter;
7272
}
7373

7474
public String getSourceTable() {
@@ -107,8 +107,8 @@ public String getTableOptions() {
107107
return tableOptions;
108108
}
109109

110-
public String getConvertorAfterTransform() {
111-
return convertorAfterTransform;
110+
public String getPostTransformConverter() {
111+
return postTransformConverter;
112112
}
113113

114114
@Override
@@ -126,8 +126,8 @@ public String toString() {
126126
+ ", description='"
127127
+ description
128128
+ '\''
129-
+ ", convertorAfterTransform='"
130-
+ convertorAfterTransform
129+
+ ", postTransformConverter='"
130+
+ postTransformConverter
131131
+ '\''
132132
+ '}';
133133
}
@@ -148,7 +148,7 @@ public boolean equals(Object o) {
148148
&& Objects.equals(primaryKeys, that.primaryKeys)
149149
&& Objects.equals(partitionKeys, that.partitionKeys)
150150
&& Objects.equals(tableOptions, that.tableOptions)
151-
&& Objects.equals(convertorAfterTransform, that.convertorAfterTransform);
151+
&& Objects.equals(postTransformConverter, that.postTransformConverter);
152152
}
153153

154154
@Override
@@ -161,6 +161,6 @@ public int hashCode() {
161161
primaryKeys,
162162
partitionKeys,
163163
tableOptions,
164-
convertorAfterTransform);
164+
postTransformConverter);
165165
}
166166
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public DataStream<Event> translatePreTransform(
6161
transform.getPrimaryKeys(),
6262
transform.getPartitionKeys(),
6363
transform.getTableOptions(),
64-
transform.getConvertorAfterTransform());
64+
transform.getPostTransformConverter());
6565
}
6666

6767
preTransformFunctionBuilder.addUdfFunctions(
@@ -93,7 +93,7 @@ public DataStream<Event> translatePostTransform(
9393
transform.getPrimaryKeys(),
9494
transform.getPartitionKeys(),
9595
transform.getTableOptions(),
96-
transform.getConvertorAfterTransform());
96+
transform.getPostTransformConverter());
9797
}
9898
}
9999
postTransformFunctionBuilder.addTimezone(timezone);

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ public void testSoftDelete() throws Exception {
351351
+ "transform:\n"
352352
+ " - source-table: \\.*.\\.*\n"
353353
+ " projection: \\*, __data_event_type__ AS op_type\n"
354-
+ " convertor-after-transform: SOFT_DELETE\n"
354+
+ " post-transform-converter: SOFT_DELETE\n"
355355
+ "\n"
356356
+ "sink:\n"
357357
+ " type: values\n"

0 commit comments

Comments
 (0)