Skip to content

Commit

Permalink
[cdc-common] Introduce PipelineConfig and PIPELINE_LOCAL_TIME_ZONE wh…
Browse files Browse the repository at this point in the history
…ich help handle time zone well
  • Loading branch information
leonardBang committed Dec 2, 2023
1 parent 5c00f76 commit 50a4c99
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.pipeline.PipelineConfig;
import com.ververica.cdc.composer.definition.PipelineDef;
import com.ververica.cdc.composer.definition.RouteDef;
import com.ververica.cdc.composer.definition.SinkDef;
Expand Down Expand Up @@ -86,9 +87,9 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi
Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY));

// Merge user config into global config
Configuration pipelineConfig = new Configuration();
pipelineConfig.addAll(globalPipelineConfig);
pipelineConfig.addAll(userPipelineConfig);
final PipelineConfig pipelineConfig = new PipelineConfig();
pipelineConfig.addConfiguration(globalPipelineConfig);
pipelineConfig.addConfiguration(userPipelineConfig);

return new PipelineDef(sourceDef, sinkDef, routeDefs, null, pipelineConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;

import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.pipeline.PipelineConfig;
import com.ververica.cdc.composer.definition.PipelineDef;
import com.ververica.cdc.composer.definition.RouteDef;
import com.ververica.cdc.composer.definition.SinkDef;
Expand Down Expand Up @@ -106,12 +107,13 @@ void testOverridingGlobalConfig() throws Exception {
"odsdb.default.ods_web_order",
"sync table to with given prefix ods_")),
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
.put("parallelism", "4")
.put("enable-schema-evolution", "false")
.build()));
PipelineConfig.fromConfiguration(
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
.put("parallelism", "4")
.put("enable-schema-evolution", "false")
.build())));

private final PipelineDef fullDefWithGlobalConf =
new PipelineDef(
Expand Down Expand Up @@ -150,13 +152,14 @@ void testOverridingGlobalConfig() throws Exception {
"odsdb.default.ods_web_order",
"sync table to with given prefix ods_")),
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
.put("parallelism", "4")
.put("enable-schema-evolution", "false")
.put("foo", "bar")
.build()));
PipelineConfig.fromConfiguration(
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
.put("parallelism", "4")
.put("enable-schema-evolution", "false")
.put("foo", "bar")
.build())));

private final PipelineDef defWithOptional =
new PipelineDef(
Expand Down Expand Up @@ -184,16 +187,17 @@ void testOverridingGlobalConfig() throws Exception {
new RouteDef(
"mydb.default.app_order_.*", "odsdb.default.app_order", null)),
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("parallelism", "4")
.build()));
PipelineConfig.fromConfiguration(
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("parallelism", "4")
.build())));

private final PipelineDef minimizedDef =
new PipelineDef(
new SourceDef("mysql", null, new Configuration()),
new SinkDef("kafka", null, new Configuration()),
Collections.emptyList(),
null,
new Configuration());
PipelineConfig.fromConfiguration(new Configuration()));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.common.pipeline;

import com.ververica.cdc.common.annotation.PublicEvolving;
import com.ververica.cdc.common.configuration.ConfigOption;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.types.LocalZonedTimestampType;
import com.ververica.cdc.common.utils.Preconditions;

import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Objects;
import java.util.Optional;
import java.util.TimeZone;

import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;

/** Configuration for the current pipeline to adjust composer process. */
@PublicEvolving
public class PipelineConfig {

public PipelineConfig() {}

/*
* A configuration object to hold all configuration that has been set specifically in the pipeline.
*/
private final Configuration configuration = new Configuration();

/**
* Adds the given key-value configuration to the underlying application-specific configuration.
* Note this will overwrite existing keys.
*
* @param configuration key-value configuration to be added
*/
public void addConfiguration(Configuration configuration) {
Preconditions.checkNotNull(configuration);
this.configuration.addAll(configuration);
}

/**
* Gives direct access to the underlying application-specific key-value map for advanced
* configuration.
*/
public Configuration getConfiguration() {
return configuration;
}

/** Sets an application-specific value for the given {@link ConfigOption}. */
public <T> PipelineConfig set(ConfigOption<T> option, T value) {
configuration.set(option, value);
return this;
}

/** Gets an application-specific value for the given {@link ConfigOption}. */
public <T> T get(ConfigOption<T> option) {
return configuration.get(option);
}

/** Gets an optional application-specific value for the given {@link ConfigOption}. */
public <T> Optional<T> getOptional(ConfigOption<T> option) {
return configuration.getOptional(option);
}

/**
* Returns the current session time zone id. It is used when converting to/from {@code TIMESTAMP
* WITH LOCAL TIME ZONE}. See {@link #setLocalTimeZone(ZoneId)} for more details.
*
* @see LocalZonedTimestampType
*/
public ZoneId getLocalTimeZone() {
final String zone = configuration.get(PIPELINE_LOCAL_TIME_ZONE);
if (PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zone)) {
return ZoneId.systemDefault();
}
validateTimeZone(zone);
return ZoneId.of(zone);
}

/**
* Sets the current session time zone id. It is used when converting to/from {@code TIMESTAMP
* WITH LOCAL TIME ZONE}. Internally, timestamps with local time zone are always represented in
* the UTC time zone. However, when converting to data types that don't include a time zone
* (e.g. TIMESTAMP, STRING), the session time zone is used during conversion.
*
* @see LocalZonedTimestampType
*/
public void setLocalTimeZone(ZoneId zoneId) {
final String zone;
if (zoneId instanceof ZoneOffset) {
// Give ZoneOffset a timezone for backwards compatibility reasons.
zone = ZoneId.ofOffset("GMT", (ZoneOffset) zoneId).toString();
} else {
zone = zoneId.toString();
}
validateTimeZone(zone);

configuration.set(PIPELINE_LOCAL_TIME_ZONE, zone);
}

private static void validateTimeZone(String zone) {
boolean isValid;
try {
isValid = TimeZone.getTimeZone(zone).toZoneId().equals(ZoneId.of(zone));
} catch (Exception var3) {
isValid = false;
}

if (!isValid) {
throw new IllegalArgumentException(
"Invalid time zone. The valid value should be a Time Zone Database ID "
+ "such as 'America/Los_Angeles' to include daylight saving time. "
+ "Fixed offsets are supported using 'GMT-08:00' or 'GMT+08:00'. "
+ "Or use 'UTC' without time zone and daylight saving time.");
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof PipelineConfig)) {
return false;
}
PipelineConfig that = (PipelineConfig) o;
return Objects.equals(configuration, that.configuration);
}

@Override
public int hashCode() {
return Objects.hash(configuration);
}

// --------------------------------------------------------------------------------------------

/** Creates a new PipelineConfig that is initialized with the given configuration. */
public static PipelineConfig fromConfiguration(Configuration configuration) {
final PipelineConfig pipelineConfig = new PipelineConfig();
pipelineConfig.addConfiguration(configuration);
return pipelineConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,26 @@ public class PipelineOptions {
"EXCEPTION: Throw an exception to terminate the sync pipeline.")))
.build());

public static final ConfigOption<String> PIPELINE_LOCAL_TIME_ZONE =
ConfigOptions.key("pipeline.local-time-zone")
.stringType()
// "systemDefault" is a special value to decide whether to use
// ZoneId.systemDefault() in
// PipelineOptions.getLocalTimeZone()
.defaultValue("systemDefault")
.withDescription(
Description.builder()
.text(
"The local time zone defines current session time zone id. ")
.linebreak()
.text(
"It is used when converting to/from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>. "
+ "Internally, timestamps with local time zone are always represented in the UTC time zone. "
+ "However, when converting to data types that don't include a time zone (e.g. TIMESTAMP, STRING), "
+ "the session time zone is used during conversion. The input of option is either a full name "
+ "such as \"America/Los_Angeles\", or a custom timezone id such as \"GMT-08:00\".")
.build());

public static final ConfigOption<String> SCHEMA_OPERATOR_UID =
ConfigOptions.key("pipeline.schema.operator.uid")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.common.pipeline;

import com.ververica.cdc.common.configuration.Configuration;
import org.junit.jupiter.api.Test;

import java.time.ZoneId;

import static com.ververica.cdc.common.pipeline.PipelineOptions.GLOBAL_PARALLELISM;
import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_NAME;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

/** Tests for {@link PipelineConfig}. */
class PipelineConfigTest {

@Test
void testAddAndGetConfiguration() {
final PipelineConfig pipelineConfig = new PipelineConfig();
final Configuration configuration = new Configuration();
configuration.set(PIPELINE_NAME, "test_pipeline_job");
configuration.set(GLOBAL_PARALLELISM, 128);
pipelineConfig.addConfiguration(configuration);

assertThat(pipelineConfig.get(PIPELINE_NAME)).isEqualTo("test_pipeline_job");
assertThat(pipelineConfig.get(GLOBAL_PARALLELISM)).isEqualTo(128);
assertThat(pipelineConfig.getConfiguration().toMap().entrySet().size()).isEqualTo(2);
}

@Test
void testSetAndGetConfigOption() {
final PipelineConfig pipelineConfig = new PipelineConfig();
pipelineConfig.set(PIPELINE_NAME, "test_pipeline_job");
assertThat(pipelineConfig.get(PIPELINE_NAME)).isEqualTo("test_pipeline_job");
assertThat(pipelineConfig.getOptional(GLOBAL_PARALLELISM)).isEmpty();
}

@Test
void testSetAndGetLocalTimeZone() {
final PipelineConfig pipelineConfig = new PipelineConfig();
assertThat(pipelineConfig.get(PIPELINE_LOCAL_TIME_ZONE))
.isEqualTo(PIPELINE_LOCAL_TIME_ZONE.defaultValue());
assertThat(pipelineConfig.getLocalTimeZone().toString())
.isNotEqualTo(PIPELINE_LOCAL_TIME_ZONE.defaultValue());
pipelineConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
assertThat(pipelineConfig.getLocalTimeZone()).isEqualTo(ZoneId.of("Asia/Shanghai"));

assertThatThrownBy(
() -> {
pipelineConfig.set(PIPELINE_LOCAL_TIME_ZONE, "invalid time zone");
pipelineConfig.getLocalTimeZone();
})
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Invalid time zone. The valid value should be a Time Zone Database ID"
+ " such as 'America/Los_Angeles' to include daylight saving time. "
+ "Fixed offsets are supported using 'GMT-08:00' or 'GMT+08:00'. "
+ "Or use 'UTC' without time zone and daylight saving time.");
}
}
Loading

0 comments on commit 50a4c99

Please sign in to comment.