Skip to content

Commit 6b1f1eb

Browse files
committed
fix review
1 parent 69bee62 commit 6b1f1eb

File tree

10 files changed

+24
-43
lines changed

10 files changed

+24
-43
lines changed

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
7878
private static final String TRANSFORM_PROJECTION_KEY = "projection";
7979
private static final String TRANSFORM_FILTER_KEY = "filter";
8080
private static final String TRANSFORM_DESCRIPTION_KEY = "description";
81-
private static final String TRANSFORM_POST_TRANSFORM_CONVERTER_KEY = "post-transform-converter";
81+
private static final String TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY =
82+
"converter-after-transform";
8283

8384
// UDF related keys
8485
private static final String UDF_KEY = "user-defined-function";
@@ -318,7 +319,7 @@ private TransformDef toTransformDef(JsonNode transformNode) {
318319
.map(JsonNode::asText)
319320
.orElse(null);
320321
String postTransformConverter =
321-
Optional.ofNullable(transformNode.get(TRANSFORM_POST_TRANSFORM_CONVERTER_KEY))
322+
Optional.ofNullable(transformNode.get(TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY))
322323
.map(JsonNode::asText)
323324
.orElse(null);
324325

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ void testParsingFullDefinitionFromString() throws Exception {
404404
+ " partition-keys: product_name\n"
405405
+ " table-options: comment=app order\n"
406406
+ " description: project fields from source table\n"
407-
+ " post-transform-converter: SOFT_DELETE\n"
407+
+ " converter-after-transform: SOFT_DELETE\n"
408408
+ " - source-table: mydb.web_order_.*\n"
409409
+ " projection: CONCAT(id, order_id) as uniq_id, *\n"
410410
+ " filter: uniq_id > 10\n"

flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ transform:
4949
partition-keys: product_name
5050
table-options: comment=app order
5151
description: project fields from source table
52-
post-transform-converter: SOFT_DELETE
52+
converter-after-transform: SOFT_DELETE
5353
- source-table: mydb.web_order_.*
5454
projection: CONCAT(id, order_id) as uniq_id, *
5555
filter: uniq_id > 10

flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ transform:
4747
partition-keys: product_name
4848
table-options: comment=app order
4949
description: project fields from source table
50-
post-transform-converter: SOFT_DELETE
50+
converter-after-transform: SOFT_DELETE
5151
- source-table: mydb.web_order_.*
5252
projection: CONCAT(id, order_id) as uniq_id, *
5353
filter: uniq_id > 10

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ public void testSoftDelete() throws Exception {
351351
+ "transform:\n"
352352
+ " - source-table: \\.*.\\.*\n"
353353
+ " projection: \\*, __data_event_type__ AS op_type\n"
354-
+ " post-transform-converter: SOFT_DELETE\n"
354+
+ " converter-after-transform: SOFT_DELETE\n"
355355
+ "\n"
356356
+ "sink:\n"
357357
+ " type: values\n"

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@
3131
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
3232
import org.apache.flink.cdc.common.schema.Schema;
3333
import org.apache.flink.cdc.common.schema.Selectors;
34-
import org.apache.flink.cdc.common.transform.converter.PostTransformConverter;
3534
import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext;
3635
import org.apache.flink.cdc.common.utils.SchemaUtils;
36+
import org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverter;
37+
import org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverters;
3738
import org.apache.flink.cdc.runtime.parser.TransformParser;
3839
import org.apache.flink.streaming.api.graph.StreamConfig;
3940
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -190,7 +191,7 @@ public void open() throws Exception {
190191
TransformProjection.of(projection).orElse(null),
191192
TransformFilter.of(filterExpression, udfDescriptors)
192193
.orElse(null),
193-
PostTransformConverter.of(
194+
PostTransformConverters.of(
194195
transformRule.getPostTransformConverter()));
195196
})
196197
.collect(Collectors.toList());

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.flink.cdc.runtime.operators.transform;
1919

2020
import org.apache.flink.cdc.common.schema.Selectors;
21-
import org.apache.flink.cdc.common.transform.converter.PostTransformConverter;
21+
import org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverter;
2222

2323
import javax.annotation.Nullable;
2424

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/transform/converter/PostTransformConverter.java renamed to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/converter/PostTransformConverter.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.cdc.common.transform.converter;
18+
package org.apache.flink.cdc.runtime.operators.transform.converter;
1919

2020
import org.apache.flink.cdc.common.annotation.Experimental;
2121
import org.apache.flink.cdc.common.event.DataChangeEvent;
22-
import org.apache.flink.cdc.common.utils.StringUtils;
2322

2423
import java.io.Serializable;
2524
import java.util.Optional;
@@ -31,32 +30,6 @@
3130
@Experimental
3231
public interface PostTransformConverter extends Serializable {
3332

33+
/** Change some parts of {@link DataChangeEvent}, like op or meta. */
3434
Optional<DataChangeEvent> convert(DataChangeEvent dataChangeEvent);
35-
36-
static Optional<PostTransformConverter> of(String classPath) {
37-
if (StringUtils.isNullOrWhitespaceOnly(classPath)) {
38-
return Optional.empty();
39-
}
40-
41-
Optional<PostTransformConverter> postTransformConverter =
42-
PostTransformConverters.getInternalPostTransformConverter(classPath);
43-
if (postTransformConverter.isPresent()) {
44-
return postTransformConverter;
45-
}
46-
47-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
48-
try {
49-
Class<?> converterClass = classLoader.loadClass(classPath);
50-
Object converter = converterClass.newInstance();
51-
if (converter instanceof PostTransformConverter) {
52-
return Optional.of((PostTransformConverter) converter);
53-
}
54-
throw new IllegalArgumentException(
55-
String.format(
56-
"%s is not an instance of %s",
57-
classPath, PostTransformConverter.class.getName()));
58-
} catch (Exception e) {
59-
throw new RuntimeException("Create post transform converter failed.", e);
60-
}
61-
}
6235
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/transform/converter/PostTransformConverters.java renamed to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/converter/PostTransformConverters.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.cdc.common.transform.converter;
18+
package org.apache.flink.cdc.runtime.operators.transform.converter;
1919

2020
import org.apache.flink.cdc.common.annotation.Experimental;
21+
import org.apache.flink.cdc.common.utils.StringUtils;
2122

2223
import java.util.Optional;
2324

@@ -26,13 +27,18 @@
2627
public class PostTransformConverters {
2728
public static final String SOFT_DELETE_CONVERTER = "SOFT_DELETE";
2829

29-
public static Optional<PostTransformConverter> getInternalPostTransformConverter(
30-
String identifier) {
30+
/** Get the {@link PostTransformConverter} by given identifier. */
31+
public static Optional<PostTransformConverter> of(String identifier) {
32+
if (StringUtils.isNullOrWhitespaceOnly(identifier)) {
33+
return Optional.empty();
34+
}
35+
3136
switch (identifier) {
3237
case SOFT_DELETE_CONVERTER:
3338
return Optional.of(new SoftDeleteConverter());
3439
default:
35-
return Optional.empty();
40+
throw new IllegalArgumentException(
41+
String.format("Failed to find the converter %s.", identifier));
3642
}
3743
}
3844
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/transform/converter/SoftDeleteConverter.java renamed to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/converter/SoftDeleteConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.cdc.common.transform.converter;
18+
package org.apache.flink.cdc.runtime.operators.transform.converter;
1919

2020
import org.apache.flink.cdc.common.event.DataChangeEvent;
2121

0 commit comments

Comments
 (0)