Skip to content

Commit a9f78e7

Browse files
committed
address comment.
1 parent 8f472b0 commit a9f78e7

File tree

3 files changed

+62
-22
lines changed

3 files changed

+62
-22
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
2222
import org.apache.flink.api.connector.sink2.Sink;
2323
import org.apache.flink.cdc.common.configuration.Configuration;
24+
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2425
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
2526
import org.apache.flink.cdc.common.event.CreateTableEvent;
2627
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -42,6 +43,7 @@
4243
import org.junit.BeforeClass;
4344
import org.junit.Test;
4445

46+
import java.time.Instant;
4547
import java.util.Arrays;
4648
import java.util.Collections;
4749
import java.util.List;
@@ -84,7 +86,11 @@ public void initializeDatabaseAndTable() {
8486
DorisContainer.DORIS_DATABASE_NAME,
8587
DorisContainer.DORIS_TABLE_NAME,
8688
"id",
87-
Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)"));
89+
Arrays.asList(
90+
"id INT NOT NULL",
91+
"number DOUBLE",
92+
"name VARCHAR(51)",
93+
"birthday DATETIMEV2(6)"));
8894

8995
// waiting for table to be created
9096
DORIS_CONTAINER.waitForLog(
@@ -135,41 +141,76 @@ private List<Event> generateEvents(TableId tableId) {
135141
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
136142
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
137143
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
144+
.column(new PhysicalColumn("birthday", DataTypes.TIMESTAMP_LTZ(6), null))
138145
.primaryKey("id")
139146
.build();
140147
BinaryRecordDataGenerator generator =
141148
new BinaryRecordDataGenerator(
142-
RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)));
149+
RowType.of(
150+
DataTypes.INT(),
151+
DataTypes.DOUBLE(),
152+
DataTypes.VARCHAR(17),
153+
DataTypes.TIMESTAMP_LTZ(6)));
143154

144155
return Arrays.asList(
145156
new CreateTableEvent(tableId, schema),
146157
DataChangeEvent.insertEvent(
147158
tableId,
148159
generator.generate(
149-
new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")})),
160+
new Object[] {
161+
17,
162+
3.14,
163+
BinaryStringData.fromString("Doris Day"),
164+
LocalZonedTimestampData.fromInstant(
165+
Instant.parse("2023-01-01T00:00:00.000Z"))
166+
})),
150167
DataChangeEvent.insertEvent(
151168
tableId,
152169
generator.generate(
153170
new Object[] {
154-
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
171+
19,
172+
2.718,
173+
BinaryStringData.fromString("Que Sera Sera"),
174+
LocalZonedTimestampData.fromInstant(
175+
Instant.parse("2023-01-01T00:00:00.000Z"))
155176
})),
156177
DataChangeEvent.insertEvent(
157178
tableId,
158179
generator.generate(
159180
new Object[] {
160-
21, 1.732, BinaryStringData.fromString("Disenchanted")
181+
21,
182+
1.732,
183+
BinaryStringData.fromString("Disenchanted"),
184+
LocalZonedTimestampData.fromInstant(
185+
Instant.parse("2023-01-01T00:00:00.000Z"))
161186
})),
162187
DataChangeEvent.updateEvent(
163188
tableId,
164189
generator.generate(
165-
new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")}),
190+
new Object[] {
191+
17,
192+
3.14,
193+
BinaryStringData.fromString("Doris Day"),
194+
LocalZonedTimestampData.fromInstant(
195+
Instant.parse("2023-01-01T00:00:00.000Z"))
196+
}),
166197
generator.generate(
167-
new Object[] {17, 6.28, BinaryStringData.fromString("Doris Day")})),
198+
new Object[] {
199+
17,
200+
6.28,
201+
BinaryStringData.fromString("Doris Day"),
202+
LocalZonedTimestampData.fromInstant(
203+
Instant.parse("2023-01-01T00:00:00.000Z"))
204+
})),
168205
DataChangeEvent.deleteEvent(
169206
tableId,
170207
generator.generate(
171208
new Object[] {
172-
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
209+
19,
210+
2.718,
211+
BinaryStringData.fromString("Que Sera Sera"),
212+
LocalZonedTimestampData.fromInstant(
213+
Instant.parse("2023-01-01T00:00:00.000Z"))
173214
})));
174215
}
175216

@@ -201,9 +242,12 @@ private void runValuesToDorisJob(boolean batchMode) throws Exception {
201242

202243
env.execute("Values to Doris Sink");
203244

204-
List<String> actual = fetchTableContent(tableId, 3);
245+
List<String> actual = fetchTableContent(tableId, 4);
205246

206-
List<String> expected = Arrays.asList("17 | 6.28 | Doris Day", "21 | 1.732 | Disenchanted");
247+
List<String> expected =
248+
Arrays.asList(
249+
"17 | 6.28 | Doris Day | 2023-01-01 00:00:00",
250+
"21 | 1.732 | Disenchanted | 2023-01-01 00:00:00");
207251

208252
assertEqualsInAnyOrder(expected, actual);
209253
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.cdc.common.event.TableId;
2323
import org.apache.flink.cdc.common.schema.Schema;
2424
import org.apache.flink.cdc.common.types.DataType;
25+
import org.apache.flink.cdc.common.types.DataTypeChecks;
2526
import org.apache.flink.table.data.DecimalData;
2627
import org.apache.flink.table.data.GenericRowData;
2728
import org.apache.flink.table.data.RowData;
@@ -30,7 +31,6 @@
3031
import org.apache.flink.table.data.binary.BinaryStringData;
3132

3233
import java.time.ZoneId;
33-
import java.time.ZonedDateTime;
3434
import java.util.ArrayList;
3535
import java.util.List;
3636

@@ -163,14 +163,11 @@ record ->
163163
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
164164
fieldGetter =
165165
record ->
166-
TimestampData.fromLocalDateTime(
167-
ZonedDateTime.ofInstant(
168-
record.getLocalZonedTimestampData(
169-
fieldPos,
170-
getPrecision(fieldType))
171-
.toInstant(),
172-
zoneId)
173-
.toLocalDateTime());
166+
TimestampData.fromInstant(
167+
record.getLocalZonedTimestampData(
168+
fieldPos,
169+
DataTypeChecks.getPrecision(fieldType))
170+
.toInstant());
174171
break;
175172
default:
176173
throw new IllegalArgumentException(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,10 @@ public void testGetRowDataFromRecordData() {
156156
Timestamp.valueOf("2023-01-01 00:00:00.000")),
157157
org.apache.flink.table.data.TimestampData.fromTimestamp(
158158
Timestamp.valueOf("2023-01-01 00:00:00")),
159-
// plus 8 hours.
160159
org.apache.flink.table.data.TimestampData.fromInstant(
161-
Instant.parse("2023-01-01T08:00:00.000Z")),
160+
Instant.parse("2023-01-01T00:00:00.000Z")),
162161
org.apache.flink.table.data.TimestampData.fromInstant(
163-
Instant.parse("2023-01-01T08:00:00.000Z")),
162+
Instant.parse("2023-01-01T00:00:00.000Z")),
164163
null),
165164
tableSchemaInfo.getRowDataFromRecordData(recordData, false));
166165
}

0 commit comments

Comments
 (0)