Skip to content

Commit 8535732

Browse files
committed
address comment.
1 parent a9f78e7 commit 8535732

File tree

2 files changed

+54
-11
lines changed

2 files changed

+54
-11
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public DataSink createDataSink(Context context) {
5151
TableCreateConfig.from(context.getFactoryConfiguration());
5252
SchemaChangeConfig schemaChangeConfig =
5353
SchemaChangeConfig.from(context.getFactoryConfiguration());
54-
String zoneStr = context.getFactoryConfiguration().get(PIPELINE_LOCAL_TIME_ZONE);
54+
String zoneStr = context.getPipelineConfiguration().get(PIPELINE_LOCAL_TIME_ZONE);
5555
ZoneId zoneId =
5656
PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zoneStr)
5757
? ZoneId.systemDefault()

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
2222
import org.apache.flink.api.connector.sink2.Sink;
2323
import org.apache.flink.cdc.common.configuration.Configuration;
24+
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2425
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
2526
import org.apache.flink.cdc.common.event.CreateTableEvent;
2627
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -42,6 +43,7 @@
4243
import org.junit.BeforeClass;
4344
import org.junit.Test;
4445

46+
import java.time.Instant;
4547
import java.util.Arrays;
4648
import java.util.List;
4749

@@ -71,7 +73,12 @@ public void initializeDatabaseAndTable() {
7173

7274
LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME);
7375

74-
List<String> schema = Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)");
76+
List<String> schema =
77+
Arrays.asList(
78+
"id INT NOT NULL",
79+
"number DOUBLE",
80+
"name VARCHAR(51)",
81+
"birthday DATETIME");
7582

7683
executeSql(
7784
String.format(
@@ -107,43 +114,76 @@ private List<Event> generateEvents(TableId tableId) {
107114
.column(new PhysicalColumn("id", DataTypes.INT(), null))
108115
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
109116
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
117+
.column(new PhysicalColumn("birthday", DataTypes.TIMESTAMP_LTZ(6), null))
110118
.primaryKey("id")
111119
.build();
112120
BinaryRecordDataGenerator generator =
113121
new BinaryRecordDataGenerator(
114-
RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)));
122+
RowType.of(
123+
DataTypes.INT(),
124+
DataTypes.DOUBLE(),
125+
DataTypes.VARCHAR(17),
126+
DataTypes.TIMESTAMP_LTZ(6)));
115127

116128
return Arrays.asList(
117129
new CreateTableEvent(tableId, schema),
118130
DataChangeEvent.insertEvent(
119131
tableId,
120132
generator.generate(
121-
new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")})),
133+
new Object[] {
134+
17,
135+
3.14,
136+
BinaryStringData.fromString("StarRocks"),
137+
LocalZonedTimestampData.fromInstant(
138+
Instant.parse("2023-01-01T00:00:00.000Z"))
139+
})),
122140
DataChangeEvent.insertEvent(
123141
tableId,
124142
generator.generate(
125143
new Object[] {
126-
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
144+
19,
145+
2.718,
146+
BinaryStringData.fromString("Que Sera Sera"),
147+
LocalZonedTimestampData.fromInstant(
148+
Instant.parse("2023-01-01T00:00:00.000Z"))
127149
})),
128150
DataChangeEvent.insertEvent(
129151
tableId,
130152
generator.generate(
131153
new Object[] {
132-
21, 1.732, BinaryStringData.fromString("Disenchanted")
154+
21,
155+
1.732,
156+
BinaryStringData.fromString("Disenchanted"),
157+
LocalZonedTimestampData.fromInstant(
158+
Instant.parse("2023-01-01T00:00:00.000Z"))
133159
})),
134160
DataChangeEvent.deleteEvent(
135161
tableId,
136162
generator.generate(
137163
new Object[] {
138-
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
164+
19,
165+
2.718,
166+
BinaryStringData.fromString("Que Sera Sera"),
167+
LocalZonedTimestampData.fromInstant(
168+
Instant.parse("2023-01-01T00:00:00.000Z"))
139169
})),
140170
DataChangeEvent.updateEvent(
141171
tableId,
142172
generator.generate(
143-
new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")}),
173+
new Object[] {
174+
17,
175+
3.14,
176+
BinaryStringData.fromString("StarRocks"),
177+
LocalZonedTimestampData.fromInstant(
178+
Instant.parse("2023-01-01T00:00:00.000Z"))
179+
}),
144180
generator.generate(
145181
new Object[] {
146-
17, 6.28, BinaryStringData.fromString("StarRocks")
182+
17,
183+
6.28,
184+
BinaryStringData.fromString("StarRocks"),
185+
LocalZonedTimestampData.fromInstant(
186+
Instant.parse("2023-01-01T00:00:00.000Z"))
147187
})));
148188
}
149189

@@ -170,8 +210,11 @@ public void testValuesToStarRocks() throws Exception {
170210

171211
env.execute("Values to StarRocks Sink");
172212

173-
List<String> actual = fetchTableContent(tableId, 3);
174-
List<String> expected = Arrays.asList("17 | 6.28 | StarRocks", "21 | 1.732 | Disenchanted");
213+
List<String> actual = fetchTableContent(tableId, 4);
214+
List<String> expected =
215+
Arrays.asList(
216+
"17 | 6.28 | Doris Day | 2023-01-01 00:00:00.0",
217+
"21 | 1.732 | Disenchanted | 2023-01-01 00:00:00.0");
175218

176219
assertEqualsInAnyOrder(expected, actual);
177220
}

0 commit comments

Comments
 (0)