Skip to content

Commit d63512e

Browse files
[FLINK-36572][pipeline-connector][starrocks] Fix the issue that the local time zone is wrongly set
This closes apache#3733.
1 parent fb2a1d0 commit d63512e

File tree

3 files changed

+40
-1
lines changed

3 files changed

+40
-1
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.cdc.connectors.starrocks.sink;
1919

20+
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2021
import org.apache.flink.cdc.common.event.Event;
2122
import org.apache.flink.cdc.common.sink.DataSink;
2223
import org.apache.flink.cdc.common.sink.EventSinkProvider;
@@ -78,4 +79,9 @@ public MetadataApplier getMetadataApplier() {
7879
sinkOptions.getPassword());
7980
return new StarRocksMetadataApplier(catalog, tableCreateConfig, schemaChangeConfig);
8081
}
82+
83+
@VisibleForTesting
84+
public ZoneId getZoneId() {
85+
return zoneId;
86+
}
8187
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public DataSink createDataSink(Context context) {
5151
TableCreateConfig.from(context.getFactoryConfiguration());
5252
SchemaChangeConfig schemaChangeConfig =
5353
SchemaChangeConfig.from(context.getFactoryConfiguration());
54-
String zoneStr = context.getFactoryConfiguration().get(PIPELINE_LOCAL_TIME_ZONE);
54+
String zoneStr = context.getPipelineConfiguration().get(PIPELINE_LOCAL_TIME_ZONE);
5555
ZoneId zoneId =
5656
PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zoneStr)
5757
? ZoneId.systemDefault()

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.assertj.core.api.Assertions;
3131
import org.junit.jupiter.api.Test;
3232

33+
import java.time.ZoneId;
3334
import java.util.HashMap;
3435
import java.util.List;
3536
import java.util.Map;
@@ -150,4 +151,36 @@ void testPrefixRequireOption() {
150151
conf, conf, Thread.currentThread().getContextClassLoader()));
151152
Assertions.assertThat(dataSink).isInstanceOf(StarRocksDataSink.class);
152153
}
154+
155+
@Test
156+
void testCreateDataSinkWithSpecificedTimeZone() {
157+
DataSinkFactory sinkFactory =
158+
FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", DataSinkFactory.class);
159+
Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class);
160+
161+
Configuration factoryConfiguration =
162+
Configuration.fromMap(
163+
ImmutableMap.<String, String>builder()
164+
.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030")
165+
.put("load-url", "127.0.0.1:8030")
166+
.put("username", "root")
167+
.put("password", "")
168+
.build());
169+
Configuration pipelineConfiguration =
170+
Configuration.fromMap(
171+
ImmutableMap.<String, String>builder()
172+
.put("local-time-zone", "America/Los_Angeles")
173+
.build());
174+
DataSink dataSink =
175+
sinkFactory.createDataSink(
176+
new FactoryHelper.DefaultContext(
177+
factoryConfiguration,
178+
pipelineConfiguration,
179+
Thread.currentThread().getContextClassLoader()));
180+
Assertions.assertThat(dataSink).isInstanceOf(StarRocksDataSink.class);
181+
182+
ZoneId zoneId = ((StarRocksDataSink) dataSink).getZoneId();
183+
ZoneId expectedZondId = ZoneId.of("America/Los_Angeles");
184+
Assertions.assertThat(zoneId).isEqualTo(expectedZondId);
185+
}
153186
}

0 commit comments

Comments
 (0)