From 50a4c9965b94686060d617a53e67636aea241367 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Sat, 2 Dec 2023 18:10:15 +0800 Subject: [PATCH] [cdc-common] Introduce PipelineConfig and PIPELINE_LOCAL_TIME_ZONE which help handle time zone well --- .../parser/YamlPipelineDefinitionParser.java | 7 +- .../YamlPipelineDefinitionParserTest.java | 40 +++-- .../cdc/common/pipeline/PipelineConfig.java | 157 ++++++++++++++++++ .../cdc/common/pipeline/PipelineOptions.java | 20 +++ .../common/pipeline/PipelineConfigTest.java | 76 +++++++++ .../cdc/composer/definition/PipelineDef.java | 20 +-- .../composer/flink/FlinkPipelineComposer.java | 10 +- .../flink/FlinkPipelineComposerITCase.java | 7 +- 8 files changed, 299 insertions(+), 38 deletions(-) create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineConfig.java create mode 100644 flink-cdc-common/src/test/java/com/ververica/cdc/common/pipeline/PipelineConfigTest.java diff --git a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java index 7004009334..cd2dbd788a 100644 --- a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -22,6 +22,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.ververica.cdc.common.configuration.Configuration; +import com.ververica.cdc.common.pipeline.PipelineConfig; import com.ververica.cdc.composer.definition.PipelineDef; import com.ververica.cdc.composer.definition.RouteDef; import com.ververica.cdc.composer.definition.SinkDef; @@ -86,9 +87,9 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY)); // Merge user config into global config - Configuration pipelineConfig = new Configuration(); - pipelineConfig.addAll(globalPipelineConfig); - pipelineConfig.addAll(userPipelineConfig); + final PipelineConfig pipelineConfig = new PipelineConfig(); + pipelineConfig.addConfiguration(globalPipelineConfig); + pipelineConfig.addConfiguration(userPipelineConfig); return new PipelineDef(sourceDef, sinkDef, routeDefs, null, pipelineConfig); } diff --git a/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index daae0f2d60..1ddc687655 100644 --- a/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -20,6 +20,7 @@ import org.apache.flink.shaded.guava31.com.google.common.io.Resources; import com.ververica.cdc.common.configuration.Configuration; +import com.ververica.cdc.common.pipeline.PipelineConfig; import com.ververica.cdc.composer.definition.PipelineDef; import com.ververica.cdc.composer.definition.RouteDef; import com.ververica.cdc.composer.definition.SinkDef; @@ -106,12 +107,13 @@ void testOverridingGlobalConfig() throws Exception { "odsdb.default.ods_web_order", "sync table to with given prefix ods_")), null, - Configuration.fromMap( - ImmutableMap.builder() - .put("name", "source-database-sync-pipe") - .put("parallelism", "4") - .put("enable-schema-evolution", "false") - .build())); + PipelineConfig.fromConfiguration( + Configuration.fromMap( + ImmutableMap.builder() + .put("name", "source-database-sync-pipe") + .put("parallelism", "4") + .put("enable-schema-evolution", "false") + .build()))); private final PipelineDef fullDefWithGlobalConf = new PipelineDef( @@ -150,13 +152,14 @@ void testOverridingGlobalConfig() throws Exception { "odsdb.default.ods_web_order", "sync table to with given prefix ods_")), null, - Configuration.fromMap( - ImmutableMap.builder() - .put("name", "source-database-sync-pipe") - .put("parallelism", "4") - .put("enable-schema-evolution", "false") - .put("foo", "bar") - .build())); + PipelineConfig.fromConfiguration( + Configuration.fromMap( + ImmutableMap.builder() + .put("name", "source-database-sync-pipe") + .put("parallelism", "4") + .put("enable-schema-evolution", "false") + .put("foo", "bar") + .build()))); private final PipelineDef defWithOptional = new PipelineDef( @@ -184,10 +187,11 @@ void testOverridingGlobalConfig() throws Exception { new RouteDef( "mydb.default.app_order_.*", "odsdb.default.app_order", null)), null, - Configuration.fromMap( - ImmutableMap.builder() - .put("parallelism", "4") - .build())); + PipelineConfig.fromConfiguration( + Configuration.fromMap( + ImmutableMap.builder() + .put("parallelism", "4") + .build()))); private final PipelineDef minimizedDef = new PipelineDef( @@ -195,5 +199,5 @@ void testOverridingGlobalConfig() throws Exception { new SinkDef("kafka", null, new Configuration()), Collections.emptyList(), null, - new Configuration()); + PipelineConfig.fromConfiguration(new Configuration())); } diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineConfig.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineConfig.java new file mode 100644 index 0000000000..f2947973ff --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineConfig.java @@ -0,0 +1,157 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.pipeline; + +import com.ververica.cdc.common.annotation.PublicEvolving; +import com.ververica.cdc.common.configuration.ConfigOption; +import com.ververica.cdc.common.configuration.Configuration; +import com.ververica.cdc.common.types.LocalZonedTimestampType; +import com.ververica.cdc.common.utils.Preconditions; + +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.Objects; +import java.util.Optional; +import java.util.TimeZone; + +import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; + +/** Configuration for the current pipeline to adjust composer process. */ +@PublicEvolving +public class PipelineConfig { + + public PipelineConfig() {} + + /* + * A configuration object to hold all configuration that has been set specifically in the pipeline. + */ + private final Configuration configuration = new Configuration(); + + /** + * Adds the given key-value configuration to the underlying application-specific configuration. + * Note this will overwrite existing keys. + * + * @param configuration key-value configuration to be added + */ + public void addConfiguration(Configuration configuration) { + Preconditions.checkNotNull(configuration); + this.configuration.addAll(configuration); + } + + /** + * Gives direct access to the underlying application-specific key-value map for advanced + * configuration. + */ + public Configuration getConfiguration() { + return configuration; + } + + /** Sets an application-specific value for the given {@link ConfigOption}. */ + public PipelineConfig set(ConfigOption option, T value) { + configuration.set(option, value); + return this; + } + + /** Gets an application-specific value for the given {@link ConfigOption}. */ + public T get(ConfigOption option) { + return configuration.get(option); + } + + /** Gets an optional application-specific value for the given {@link ConfigOption}. */ + public Optional getOptional(ConfigOption option) { + return configuration.getOptional(option); + } + + /** + * Returns the current session time zone id. It is used when converting to/from {@code TIMESTAMP + * WITH LOCAL TIME ZONE}. See {@link #setLocalTimeZone(ZoneId)} for more details. + * + * @see LocalZonedTimestampType + */ + public ZoneId getLocalTimeZone() { + final String zone = configuration.get(PIPELINE_LOCAL_TIME_ZONE); + if (PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zone)) { + return ZoneId.systemDefault(); + } + validateTimeZone(zone); + return ZoneId.of(zone); + } + + /** + * Sets the current session time zone id. It is used when converting to/from {@code TIMESTAMP + * WITH LOCAL TIME ZONE}. Internally, timestamps with local time zone are always represented in + * the UTC time zone. However, when converting to data types that don't include a time zone + * (e.g. TIMESTAMP, STRING), the session time zone is used during conversion. + * + * @see LocalZonedTimestampType + */ + public void setLocalTimeZone(ZoneId zoneId) { + final String zone; + if (zoneId instanceof ZoneOffset) { + // Give ZoneOffset a timezone for backwards compatibility reasons. + zone = ZoneId.ofOffset("GMT", (ZoneOffset) zoneId).toString(); + } else { + zone = zoneId.toString(); + } + validateTimeZone(zone); + + configuration.set(PIPELINE_LOCAL_TIME_ZONE, zone); + } + + private static void validateTimeZone(String zone) { + boolean isValid; + try { + isValid = TimeZone.getTimeZone(zone).toZoneId().equals(ZoneId.of(zone)); + } catch (Exception var3) { + isValid = false; + } + + if (!isValid) { + throw new IllegalArgumentException( + "Invalid time zone. The valid value should be a Time Zone Database ID " + + "such as 'America/Los_Angeles' to include daylight saving time. " + + "Fixed offsets are supported using 'GMT-08:00' or 'GMT+08:00'. " + + "Or use 'UTC' without time zone and daylight saving time."); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PipelineConfig)) { + return false; + } + PipelineConfig that = (PipelineConfig) o; + return Objects.equals(configuration, that.configuration); + } + + @Override + public int hashCode() { + return Objects.hash(configuration); + } + + // -------------------------------------------------------------------------------------------- + + /** Creates a new PipelineConfig that is initialized with the given configuration. */ + public static PipelineConfig fromConfiguration(Configuration configuration) { + final PipelineConfig pipelineConfig = new PipelineConfig(); + pipelineConfig.addConfiguration(configuration); + return pipelineConfig; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineOptions.java index 838e0c1599..47be8b65b5 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineOptions.java @@ -57,6 +57,26 @@ public class PipelineOptions { "EXCEPTION: Throw an exception to terminate the sync pipeline."))) .build()); + public static final ConfigOption PIPELINE_LOCAL_TIME_ZONE = + ConfigOptions.key("pipeline.local-time-zone") + .stringType() + // "systemDefault" is a special value to decide whether to use + // ZoneId.systemDefault() in + // PipelineOptions.getLocalTimeZone() + .defaultValue("systemDefault") + .withDescription( + Description.builder() + .text( + "The local time zone defines current session time zone id. ") + .linebreak() + .text( + "It is used when converting to/from TIMESTAMP WITH LOCAL TIME ZONE. " + + "Internally, timestamps with local time zone are always represented in the UTC time zone. " + + "However, when converting to data types that don't include a time zone (e.g. TIMESTAMP, STRING), " + + "the session time zone is used during conversion. The input of option is either a full name " + + "such as \"America/Los_Angeles\", or a custom timezone id such as \"GMT-08:00\".") + .build()); + public static final ConfigOption SCHEMA_OPERATOR_UID = ConfigOptions.key("pipeline.schema.operator.uid") .stringType() diff --git a/flink-cdc-common/src/test/java/com/ververica/cdc/common/pipeline/PipelineConfigTest.java b/flink-cdc-common/src/test/java/com/ververica/cdc/common/pipeline/PipelineConfigTest.java new file mode 100644 index 0000000000..0063812bca --- /dev/null +++ b/flink-cdc-common/src/test/java/com/ververica/cdc/common/pipeline/PipelineConfigTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.pipeline; + +import com.ververica.cdc.common.configuration.Configuration; +import org.junit.jupiter.api.Test; + +import java.time.ZoneId; + +import static com.ververica.cdc.common.pipeline.PipelineOptions.GLOBAL_PARALLELISM; +import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; +import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_NAME; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +/** Tests for {@link PipelineConfig}. */ +class PipelineConfigTest { + + @Test + void testAddAndGetConfiguration() { + final PipelineConfig pipelineConfig = new PipelineConfig(); + final Configuration configuration = new Configuration(); + configuration.set(PIPELINE_NAME, "test_pipeline_job"); + configuration.set(GLOBAL_PARALLELISM, 128); + pipelineConfig.addConfiguration(configuration); + + assertThat(pipelineConfig.get(PIPELINE_NAME)).isEqualTo("test_pipeline_job"); + assertThat(pipelineConfig.get(GLOBAL_PARALLELISM)).isEqualTo(128); + assertThat(pipelineConfig.getConfiguration().toMap().entrySet().size()).isEqualTo(2); + } + + @Test + void testSetAndGetConfigOption() { + final PipelineConfig pipelineConfig = new PipelineConfig(); + pipelineConfig.set(PIPELINE_NAME, "test_pipeline_job"); + assertThat(pipelineConfig.get(PIPELINE_NAME)).isEqualTo("test_pipeline_job"); + assertThat(pipelineConfig.getOptional(GLOBAL_PARALLELISM)).isEmpty(); + } + + @Test + void testSetAndGetLocalTimeZone() { + final PipelineConfig pipelineConfig = new PipelineConfig(); + assertThat(pipelineConfig.get(PIPELINE_LOCAL_TIME_ZONE)) + .isEqualTo(PIPELINE_LOCAL_TIME_ZONE.defaultValue()); + assertThat(pipelineConfig.getLocalTimeZone().toString()) + .isNotEqualTo(PIPELINE_LOCAL_TIME_ZONE.defaultValue()); + pipelineConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")); + assertThat(pipelineConfig.getLocalTimeZone()).isEqualTo(ZoneId.of("Asia/Shanghai")); + + assertThatThrownBy( + () -> { + pipelineConfig.set(PIPELINE_LOCAL_TIME_ZONE, "invalid time zone"); + pipelineConfig.getLocalTimeZone(); + }) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Invalid time zone. The valid value should be a Time Zone Database ID" + + " such as 'America/Los_Angeles' to include daylight saving time. " + + "Fixed offsets are supported using 'GMT-08:00' or 'GMT+08:00'. " + + "Or use 'UTC' without time zone and daylight saving time."); + } +} diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java index e5cbc57505..f9e796e81c 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java @@ -16,7 +16,7 @@ package com.ververica.cdc.composer.definition; -import com.ververica.cdc.common.configuration.Configuration; +import com.ververica.cdc.common.pipeline.PipelineConfig; import java.util.List; import java.util.Objects; @@ -46,19 +46,19 @@ public class PipelineDef { private final SinkDef sink; private final List routes; private final List transforms; - private final Configuration config; + private final PipelineConfig pipelineConfig; public PipelineDef( SourceDef source, SinkDef sink, List routes, List transforms, - Configuration config) { + PipelineConfig pipelineConfig) { this.source = source; this.sink = sink; this.routes = routes; this.transforms = transforms; - this.config = config; + this.pipelineConfig = pipelineConfig; } public SourceDef getSource() { @@ -77,8 +77,8 @@ public List getTransforms() { return transforms; } - public Configuration getConfig() { - return config; + public PipelineConfig getPipelineConfig() { + return pipelineConfig; } @Override @@ -92,8 +92,8 @@ public String toString() { + routes + ", transforms=" + transforms - + ", config=" - + config + + ", pipelineConfig=" + + pipelineConfig + '}'; } @@ -110,11 +110,11 @@ public boolean equals(Object o) { && Objects.equals(sink, that.sink) && Objects.equals(routes, that.routes) && Objects.equals(transforms, that.transforms) - && Objects.equals(config, that.config); + && Objects.equals(pipelineConfig, that.pipelineConfig); } @Override public int hashCode() { - return Objects.hash(source, sink, routes, transforms, config); + return Objects.hash(source, sink, routes, transforms, pipelineConfig); } } diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java index 91bcbc557a..83003bfed7 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java @@ -66,7 +66,7 @@ private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking @Override public PipelineExecution compose(PipelineDef pipelineDef) { - int parallelism = pipelineDef.getConfig().get(PipelineOptions.GLOBAL_PARALLELISM); + int parallelism = pipelineDef.getPipelineConfig().get(PipelineOptions.GLOBAL_PARALLELISM); env.getConfig().setParallelism(parallelism); // Source @@ -84,8 +84,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) { // Schema operator SchemaOperatorTranslator schemaOperatorTranslator = new SchemaOperatorTranslator( - pipelineDef.getConfig().get(PipelineOptions.SCHEMA_CHANGE_BEHAVIOR), - pipelineDef.getConfig().get(PipelineOptions.SCHEMA_OPERATOR_UID)); + pipelineDef.getPipelineConfig().get(PipelineOptions.SCHEMA_CHANGE_BEHAVIOR), + pipelineDef.getPipelineConfig().get(PipelineOptions.SCHEMA_OPERATOR_UID)); stream = schemaOperatorTranslator.translate( stream, parallelism, dataSink.getMetadataApplier()); @@ -103,7 +103,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) { sinkTranslator.translate(stream, dataSink, schemaOperatorIDGenerator.generate()); return new FlinkPipelineExecution( - env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking); + env, + pipelineDef.getPipelineConfig().get(PipelineOptions.PIPELINE_NAME), + isBlocking); } private DataSink createDataSink(SinkDef sinkDef) { diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java index 530e1804b1..5e0290f6bf 100644 --- a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -20,6 +20,7 @@ import org.apache.flink.test.junit5.MiniClusterExtension; import com.ververica.cdc.common.configuration.Configuration; +import com.ververica.cdc.common.pipeline.PipelineConfig; import com.ververica.cdc.common.pipeline.PipelineOptions; import com.ververica.cdc.composer.PipelineExecution; import com.ververica.cdc.composer.definition.PipelineDef; @@ -108,7 +109,7 @@ void testSingleSplitSingleTable() throws Exception { SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); // Setup pipeline - Configuration pipelineConfig = new Configuration(); + PipelineConfig pipelineConfig = new PipelineConfig(); pipelineConfig.set(PipelineOptions.GLOBAL_PARALLELISM, 1); PipelineDef pipelineDef = new PipelineDef( @@ -162,7 +163,7 @@ void testSingleSplitMultipleTables() throws Exception { SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); // Setup pipeline - Configuration pipelineConfig = new Configuration(); + PipelineConfig pipelineConfig = new PipelineConfig(); pipelineConfig.set(PipelineOptions.GLOBAL_PARALLELISM, 1); PipelineDef pipelineDef = new PipelineDef( @@ -226,7 +227,7 @@ void testMultiSplitsSingleTable() throws Exception { SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); // Setup pipeline - Configuration pipelineConfig = new Configuration(); + PipelineConfig pipelineConfig = new PipelineConfig(); pipelineConfig.set(PipelineOptions.GLOBAL_PARALLELISM, MAX_PARALLELISM); PipelineDef pipelineDef = new PipelineDef(