Skip to content

Commit 10b50ac

Browse files
committed
[FLINK-36474] Support merging timestamp columns when routing
1 parent dd69756 commit 10b50ac

File tree

13 files changed

+379
-68
lines changed

13 files changed

+379
-68
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
import org.apache.flink.cdc.common.types.DataTypeRoot;
3434
import org.apache.flink.cdc.common.types.DataTypes;
3535
import org.apache.flink.cdc.common.types.DecimalType;
36+
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
37+
import org.apache.flink.cdc.common.types.TimestampType;
38+
import org.apache.flink.cdc.common.types.ZonedTimestampType;
3639

3740
import javax.annotation.Nullable;
3841

@@ -176,6 +179,24 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
176179
if (lType.equals(rType)) {
177180
// identical type
178181
mergedType = rType;
182+
} else if (lType instanceof TimestampType && rType instanceof TimestampType) {
183+
return DataTypes.TIMESTAMP(
184+
Math.max(
185+
((TimestampType) lType).getPrecision(),
186+
((TimestampType) rType).getPrecision()));
187+
} else if (lType instanceof ZonedTimestampType && rType instanceof ZonedTimestampType) {
188+
return DataTypes.TIMESTAMP_TZ(
189+
Math.max(
190+
((ZonedTimestampType) lType).getPrecision(),
191+
((ZonedTimestampType) rType).getPrecision()));
192+
} else if (lType instanceof LocalZonedTimestampType
193+
&& rType instanceof LocalZonedTimestampType) {
194+
return DataTypes.TIMESTAMP_LTZ(
195+
Math.max(
196+
((LocalZonedTimestampType) lType).getPrecision(),
197+
((LocalZonedTimestampType) rType).getPrecision()));
198+
} else if (lType.is(DataTypeFamily.TIMESTAMP) && rType.is(DataTypeFamily.TIMESTAMP)) {
199+
return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION);
179200
} else if (lType.is(DataTypeFamily.INTEGER_NUMERIC)
180201
&& rType.is(DataTypeFamily.INTEGER_NUMERIC)) {
181202
mergedType = DataTypes.BIGINT();
@@ -185,7 +206,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
185206
} else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
186207
&& rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
187208
mergedType = DataTypes.DOUBLE();
188-
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeRoot.DECIMAL)) {
209+
} else if (lType instanceof DecimalType && rType instanceof DecimalType) {
189210
// Merge two decimal types
190211
DecimalType lhsDecimal = (DecimalType) lType;
191212
DecimalType rhsDecimal = (DecimalType) rType;
@@ -195,7 +216,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
195216
rhsDecimal.getPrecision() - rhsDecimal.getScale());
196217
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
197218
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
198-
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
219+
} else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
199220
// Merge decimal and int
200221
DecimalType lhsDecimal = (DecimalType) lType;
201222
mergedType =
@@ -204,7 +225,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
204225
lhsDecimal.getPrecision(),
205226
lhsDecimal.getScale() + getNumericPrecision(rType)),
206227
lhsDecimal.getScale());
207-
} else if (rType.is(DataTypeRoot.DECIMAL) && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
228+
} else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
208229
// Merge decimal and int
209230
DecimalType rhsDecimal = (DecimalType) rType;
210231
mergedType =

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,35 @@ public void testInferWiderType() {
291291
DataTypes.INT().nullable(), DataTypes.INT().nullable()))
292292
.isEqualTo(DataTypes.INT().nullable());
293293

294+
// Test merging temporal types
295+
Assertions.assertThat(
296+
SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(9), DataTypes.TIMESTAMP(6)))
297+
.isEqualTo(DataTypes.TIMESTAMP(9));
298+
299+
Assertions.assertThat(
300+
SchemaUtils.inferWiderType(
301+
DataTypes.TIMESTAMP_TZ(3), DataTypes.TIMESTAMP_TZ(7)))
302+
.isEqualTo(DataTypes.TIMESTAMP_TZ(7));
303+
304+
Assertions.assertThat(
305+
SchemaUtils.inferWiderType(
306+
DataTypes.TIMESTAMP_LTZ(2), DataTypes.TIMESTAMP_LTZ(1)))
307+
.isEqualTo(DataTypes.TIMESTAMP_LTZ(2));
308+
309+
Assertions.assertThat(
310+
SchemaUtils.inferWiderType(
311+
DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP()))
312+
.isEqualTo(DataTypes.TIMESTAMP(9));
313+
314+
Assertions.assertThat(
315+
SchemaUtils.inferWiderType(DataTypes.TIMESTAMP_TZ(), DataTypes.TIMESTAMP()))
316+
.isEqualTo(DataTypes.TIMESTAMP(9));
317+
318+
Assertions.assertThat(
319+
SchemaUtils.inferWiderType(
320+
DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP_TZ()))
321+
.isEqualTo(DataTypes.TIMESTAMP(9));
322+
294323
// incompatible type merges test
295324
Assertions.assertThatThrownBy(
296325
() -> SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DOUBLE()))

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.composer.flink;
1919

2020
import org.apache.flink.cdc.common.annotation.Internal;
21+
import org.apache.flink.cdc.common.configuration.Configuration;
2122
import org.apache.flink.cdc.common.event.Event;
2223
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
2324
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
@@ -88,17 +89,19 @@ private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking
8889

8990
@Override
9091
public PipelineExecution compose(PipelineDef pipelineDef) {
91-
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
92+
Configuration pipelineDefConfig = pipelineDef.getConfig();
93+
94+
int parallelism = pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
9295
env.getConfig().setParallelism(parallelism);
9396

9497
SchemaChangeBehavior schemaChangeBehavior =
95-
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
98+
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
9699

97100
// Build Source Operator
98101
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
99102
DataStream<Event> stream =
100103
sourceTranslator.translate(
101-
pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);
104+
pipelineDef.getSource(), env, pipelineDefConfig, parallelism);
102105

103106
// Build PreTransformOperator for processing Schema Event
104107
TransformTranslator transformTranslator = new TransformTranslator();
@@ -110,10 +113,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
110113
SchemaOperatorTranslator schemaOperatorTranslator =
111114
new SchemaOperatorTranslator(
112115
schemaChangeBehavior,
113-
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
114-
pipelineDef
115-
.getConfig()
116-
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT));
116+
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
117+
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT),
118+
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
117119
OperatorIDGenerator schemaOperatorIDGenerator =
118120
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
119121

@@ -122,13 +124,13 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
122124
transformTranslator.translatePostTransform(
123125
stream,
124126
pipelineDef.getTransforms(),
125-
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
127+
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
126128
pipelineDef.getUdfs());
127129

128130
// Build DataSink in advance as schema operator requires MetadataApplier
129131
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
130132
DataSink dataSink =
131-
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env);
133+
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);
132134

133135
stream =
134136
schemaOperatorTranslator.translate(
@@ -157,7 +159,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
157159
addFrameworkJars();
158160

159161
return new FlinkPipelineExecution(
160-
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
162+
env, pipelineDefConfig.get(PipelineOptions.PIPELINE_NAME), isBlocking);
161163
}
162164

163165
private void addFrameworkJars() {

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.flink.cdc.common.annotation.Internal;
2121
import org.apache.flink.cdc.common.event.Event;
22-
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2322
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2423
import org.apache.flink.cdc.common.route.RouteRule;
2524
import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -39,24 +38,27 @@
3938
public class SchemaOperatorTranslator {
4039
private final SchemaChangeBehavior schemaChangeBehavior;
4140
private final String schemaOperatorUid;
42-
4341
private final Duration rpcTimeOut;
42+
private final String timezone;
4443

4544
public SchemaOperatorTranslator(
4645
SchemaChangeBehavior schemaChangeBehavior,
4746
String schemaOperatorUid,
48-
Duration rpcTimeOut) {
47+
Duration rpcTimeOut,
48+
String timezone) {
4949
this.schemaChangeBehavior = schemaChangeBehavior;
5050
this.schemaOperatorUid = schemaOperatorUid;
5151
this.rpcTimeOut = rpcTimeOut;
52+
this.timezone = timezone;
5253
}
5354

5455
public DataStream<Event> translate(
5556
DataStream<Event> input,
5657
int parallelism,
5758
MetadataApplier metadataApplier,
5859
List<RouteDef> routes) {
59-
return addSchemaOperator(input, parallelism, metadataApplier, routes, schemaChangeBehavior);
60+
return addSchemaOperator(
61+
input, parallelism, metadataApplier, routes, schemaChangeBehavior, timezone);
6062
}
6163

6264
public String getSchemaOperatorUid() {
@@ -68,7 +70,8 @@ private DataStream<Event> addSchemaOperator(
6870
int parallelism,
6971
MetadataApplier metadataApplier,
7072
List<RouteDef> routes,
71-
SchemaChangeBehavior schemaChangeBehavior) {
73+
SchemaChangeBehavior schemaChangeBehavior,
74+
String timezone) {
7275
List<RouteRule> routingRules = new ArrayList<>();
7376
for (RouteDef route : routes) {
7477
routingRules.add(
@@ -82,27 +85,12 @@ private DataStream<Event> addSchemaOperator(
8285
"SchemaOperator",
8386
new EventTypeInfo(),
8487
new SchemaOperatorFactory(
85-
metadataApplier, routingRules, rpcTimeOut, schemaChangeBehavior));
88+
metadataApplier,
89+
routingRules,
90+
rpcTimeOut,
91+
schemaChangeBehavior,
92+
timezone));
8693
stream.uid(schemaOperatorUid).setParallelism(parallelism);
8794
return stream;
8895
}
89-
90-
private DataStream<Event> dropSchemaChangeEvent(DataStream<Event> input, int parallelism) {
91-
return input.filter(event -> !(event instanceof SchemaChangeEvent))
92-
.setParallelism(parallelism);
93-
}
94-
95-
private DataStream<Event> exceptionOnSchemaChange(DataStream<Event> input, int parallelism) {
96-
return input.map(
97-
event -> {
98-
if (event instanceof SchemaChangeEvent) {
99-
throw new RuntimeException(
100-
String.format(
101-
"Aborting execution as the pipeline encountered a schema change event: %s",
102-
event));
103-
}
104-
return event;
105-
})
106-
.setParallelism(parallelism);
107-
}
10896
}

0 commit comments

Comments
 (0)