Skip to content

Commit ecfee70

Browse files
authored
[FLINK-36560][pipeline-connector][paimon] Fix the issue that timestamp_ltz field is not correctly converted
This closes apache#3648.
1 parent e15bf6f commit ecfee70

File tree

6 files changed

+122
-45
lines changed

6 files changed

+122
-45
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
2222
import org.apache.flink.api.connector.sink2.Sink;
2323
import org.apache.flink.cdc.common.configuration.Configuration;
24+
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2425
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
2526
import org.apache.flink.cdc.common.event.CreateTableEvent;
2627
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -42,6 +43,7 @@
4243
import org.junit.BeforeClass;
4344
import org.junit.Test;
4445

46+
import java.time.Instant;
4547
import java.util.Arrays;
4648
import java.util.Collections;
4749
import java.util.List;
@@ -84,7 +86,11 @@ public void initializeDatabaseAndTable() {
8486
DorisContainer.DORIS_DATABASE_NAME,
8587
DorisContainer.DORIS_TABLE_NAME,
8688
"id",
87-
Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)"));
89+
Arrays.asList(
90+
"id INT NOT NULL",
91+
"number DOUBLE",
92+
"name VARCHAR(51)",
93+
"birthday DATETIMEV2(6)"));
8894

8995
// waiting for table to be created
9096
DORIS_CONTAINER.waitForLog(
@@ -135,41 +141,76 @@ private List<Event> generateEvents(TableId tableId) {
135141
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
136142
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
137143
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
144+
.column(new PhysicalColumn("birthday", DataTypes.TIMESTAMP_LTZ(6), null))
138145
.primaryKey("id")
139146
.build();
140147
BinaryRecordDataGenerator generator =
141148
new BinaryRecordDataGenerator(
142-
RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)));
149+
RowType.of(
150+
DataTypes.INT(),
151+
DataTypes.DOUBLE(),
152+
DataTypes.VARCHAR(17),
153+
DataTypes.TIMESTAMP_LTZ(6)));
143154

144155
return Arrays.asList(
145156
new CreateTableEvent(tableId, schema),
146157
DataChangeEvent.insertEvent(
147158
tableId,
148159
generator.generate(
149-
new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")})),
160+
new Object[] {
161+
17,
162+
3.14,
163+
BinaryStringData.fromString("Doris Day"),
164+
LocalZonedTimestampData.fromInstant(
165+
Instant.parse("2023-01-01T00:00:00.000Z"))
166+
})),
150167
DataChangeEvent.insertEvent(
151168
tableId,
152169
generator.generate(
153170
new Object[] {
154-
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
171+
19,
172+
2.718,
173+
BinaryStringData.fromString("Que Sera Sera"),
174+
LocalZonedTimestampData.fromInstant(
175+
Instant.parse("2023-01-01T00:00:00.000Z"))
155176
})),
156177
DataChangeEvent.insertEvent(
157178
tableId,
158179
generator.generate(
159180
new Object[] {
160-
21, 1.732, BinaryStringData.fromString("Disenchanted")
181+
21,
182+
1.732,
183+
BinaryStringData.fromString("Disenchanted"),
184+
LocalZonedTimestampData.fromInstant(
185+
Instant.parse("2023-01-01T00:00:00.000Z"))
161186
})),
162187
DataChangeEvent.updateEvent(
163188
tableId,
164189
generator.generate(
165-
new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")}),
190+
new Object[] {
191+
17,
192+
3.14,
193+
BinaryStringData.fromString("Doris Day"),
194+
LocalZonedTimestampData.fromInstant(
195+
Instant.parse("2023-01-01T00:00:00.000Z"))
196+
}),
166197
generator.generate(
167-
new Object[] {17, 6.28, BinaryStringData.fromString("Doris Day")})),
198+
new Object[] {
199+
17,
200+
6.28,
201+
BinaryStringData.fromString("Doris Day"),
202+
LocalZonedTimestampData.fromInstant(
203+
Instant.parse("2023-01-01T00:00:00.000Z"))
204+
})),
168205
DataChangeEvent.deleteEvent(
169206
tableId,
170207
generator.generate(
171208
new Object[] {
172-
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
209+
19,
210+
2.718,
211+
BinaryStringData.fromString("Que Sera Sera"),
212+
LocalZonedTimestampData.fromInstant(
213+
Instant.parse("2023-01-01T00:00:00.000Z"))
173214
})));
174215
}
175216

@@ -201,9 +242,12 @@ private void runValuesToDorisJob(boolean batchMode) throws Exception {
201242

202243
env.execute("Values to Doris Sink");
203244

204-
List<String> actual = fetchTableContent(tableId, 3);
245+
List<String> actual = fetchTableContent(tableId, 4);
205246

206-
List<String> expected = Arrays.asList("17 | 6.28 | Doris Day", "21 | 1.732 | Disenchanted");
247+
List<String> expected =
248+
Arrays.asList(
249+
"17 | 6.28 | Doris Day | 2023-01-01 00:00:00",
250+
"21 | 1.732 | Disenchanted | 2023-01-01 00:00:00");
207251

208252
assertEqualsInAnyOrder(expected, actual);
209253
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.cdc.common.event.TableId;
2323
import org.apache.flink.cdc.common.schema.Schema;
2424
import org.apache.flink.cdc.common.types.DataType;
25+
import org.apache.flink.cdc.common.types.DataTypeChecks;
2526
import org.apache.flink.table.data.DecimalData;
2627
import org.apache.flink.table.data.GenericRowData;
2728
import org.apache.flink.table.data.RowData;
@@ -30,7 +31,6 @@
3031
import org.apache.flink.table.data.binary.BinaryStringData;
3132

3233
import java.time.ZoneId;
33-
import java.time.ZonedDateTime;
3434
import java.util.ArrayList;
3535
import java.util.List;
3636

@@ -163,14 +163,11 @@ record ->
163163
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
164164
fieldGetter =
165165
record ->
166-
TimestampData.fromLocalDateTime(
167-
ZonedDateTime.ofInstant(
168-
record.getLocalZonedTimestampData(
169-
fieldPos,
170-
getPrecision(fieldType))
171-
.toInstant(),
172-
zoneId)
173-
.toLocalDateTime());
166+
TimestampData.fromInstant(
167+
record.getLocalZonedTimestampData(
168+
fieldPos,
169+
DataTypeChecks.getPrecision(fieldType))
170+
.toInstant());
174171
break;
175172
default:
176173
throw new IllegalArgumentException(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,10 @@ public void testGetRowDataFromRecordData() {
156156
Timestamp.valueOf("2023-01-01 00:00:00.000")),
157157
org.apache.flink.table.data.TimestampData.fromTimestamp(
158158
Timestamp.valueOf("2023-01-01 00:00:00")),
159-
// plus 8 hours.
160159
org.apache.flink.table.data.TimestampData.fromInstant(
161-
Instant.parse("2023-01-01T08:00:00.000Z")),
160+
Instant.parse("2023-01-01T00:00:00.000Z")),
162161
org.apache.flink.table.data.TimestampData.fromInstant(
163-
Instant.parse("2023-01-01T08:00:00.000Z")),
162+
Instant.parse("2023-01-01T00:00:00.000Z")),
164163
null),
165164
tableSchemaInfo.getRowDataFromRecordData(recordData, false));
166165
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.paimon.types.RowKind;
3333

3434
import java.time.ZoneId;
35-
import java.time.ZonedDateTime;
3635
import java.util.ArrayList;
3736
import java.util.List;
3837

@@ -111,15 +110,11 @@ private static RecordData.FieldGetter createFieldGetter(
111110
case TIMESTAMP_WITH_TIME_ZONE:
112111
fieldGetter =
113112
row ->
114-
Timestamp.fromLocalDateTime(
115-
ZonedDateTime.ofInstant(
116-
row.getLocalZonedTimestampData(
117-
fieldPos,
118-
DataTypeChecks.getPrecision(
119-
fieldType))
120-
.toInstant(),
121-
zoneId)
122-
.toLocalDateTime());
113+
Timestamp.fromInstant(
114+
row.getLocalZonedTimestampData(
115+
fieldPos,
116+
DataTypeChecks.getPrecision(fieldType))
117+
.toInstant());
123118
break;
124119
case ROW:
125120
final int rowFieldCount = getFieldCount(fieldType);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,8 @@ public void testConvertEventToGenericRowOfAllDataTypes() {
130130
java.sql.Timestamp.valueOf("2023-01-01 00:00:00.000")),
131131
Timestamp.fromSQLTimestamp(
132132
java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
133-
// plus 8 hours.
134-
Timestamp.fromInstant(Instant.parse("2023-01-01T08:00:00.000Z")),
135-
Timestamp.fromInstant(Instant.parse("2023-01-01T08:00:00.000Z")),
133+
Timestamp.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
134+
Timestamp.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
136135
null),
137136
genericRow);
138137
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
2222
import org.apache.flink.api.connector.sink2.Sink;
2323
import org.apache.flink.cdc.common.configuration.Configuration;
24+
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2425
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
2526
import org.apache.flink.cdc.common.event.CreateTableEvent;
2627
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -42,6 +43,7 @@
4243
import org.junit.BeforeClass;
4344
import org.junit.Test;
4445

46+
import java.time.Instant;
4547
import java.util.Arrays;
4648
import java.util.List;
4749

@@ -71,7 +73,12 @@ public void initializeDatabaseAndTable() {
7173

7274
LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME);
7375

74-
List<String> schema = Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)");
76+
List<String> schema =
77+
Arrays.asList(
78+
"id INT NOT NULL",
79+
"number DOUBLE",
80+
"name VARCHAR(51)",
81+
"birthday DATETIME");
7582

7683
executeSql(
7784
String.format(
@@ -107,43 +114,76 @@ private List<Event> generateEvents(TableId tableId) {
107114
.column(new PhysicalColumn("id", DataTypes.INT(), null))
108115
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
109116
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
117+
.column(new PhysicalColumn("birthday", DataTypes.TIMESTAMP_LTZ(6), null))
110118
.primaryKey("id")
111119
.build();
112120
BinaryRecordDataGenerator generator =
113121
new BinaryRecordDataGenerator(
114-
RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)));
122+
RowType.of(
123+
DataTypes.INT(),
124+
DataTypes.DOUBLE(),
125+
DataTypes.VARCHAR(17),
126+
DataTypes.TIMESTAMP_LTZ(6)));
115127

116128
return Arrays.asList(
117129
new CreateTableEvent(tableId, schema),
118130
DataChangeEvent.insertEvent(
119131
tableId,
120132
generator.generate(
121-
new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")})),
133+
new Object[] {
134+
17,
135+
3.14,
136+
BinaryStringData.fromString("StarRocks"),
137+
LocalZonedTimestampData.fromInstant(
138+
Instant.parse("2023-01-01T00:00:00.000Z"))
139+
})),
122140
DataChangeEvent.insertEvent(
123141
tableId,
124142
generator.generate(
125143
new Object[] {
126-
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
144+
19,
145+
2.718,
146+
BinaryStringData.fromString("Que Sera Sera"),
147+
LocalZonedTimestampData.fromInstant(
148+
Instant.parse("2023-01-01T00:00:00.000Z"))
127149
})),
128150
DataChangeEvent.insertEvent(
129151
tableId,
130152
generator.generate(
131153
new Object[] {
132-
21, 1.732, BinaryStringData.fromString("Disenchanted")
154+
21,
155+
1.732,
156+
BinaryStringData.fromString("Disenchanted"),
157+
LocalZonedTimestampData.fromInstant(
158+
Instant.parse("2023-01-01T00:00:00.000Z"))
133159
})),
134160
DataChangeEvent.deleteEvent(
135161
tableId,
136162
generator.generate(
137163
new Object[] {
138-
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
164+
19,
165+
2.718,
166+
BinaryStringData.fromString("Que Sera Sera"),
167+
LocalZonedTimestampData.fromInstant(
168+
Instant.parse("2023-01-01T00:00:00.000Z"))
139169
})),
140170
DataChangeEvent.updateEvent(
141171
tableId,
142172
generator.generate(
143-
new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")}),
173+
new Object[] {
174+
17,
175+
3.14,
176+
BinaryStringData.fromString("StarRocks"),
177+
LocalZonedTimestampData.fromInstant(
178+
Instant.parse("2023-01-01T00:00:00.000Z"))
179+
}),
144180
generator.generate(
145181
new Object[] {
146-
17, 6.28, BinaryStringData.fromString("StarRocks")
182+
17,
183+
6.28,
184+
BinaryStringData.fromString("StarRocks"),
185+
LocalZonedTimestampData.fromInstant(
186+
Instant.parse("2023-01-01T00:00:00.000Z"))
147187
})));
148188
}
149189

@@ -170,8 +210,11 @@ public void testValuesToStarRocks() throws Exception {
170210

171211
env.execute("Values to StarRocks Sink");
172212

173-
List<String> actual = fetchTableContent(tableId, 3);
174-
List<String> expected = Arrays.asList("17 | 6.28 | StarRocks", "21 | 1.732 | Disenchanted");
213+
List<String> actual = fetchTableContent(tableId, 4);
214+
List<String> expected =
215+
Arrays.asList(
216+
"17 | 6.28 | StarRocks | 2023-01-01 00:00:00.0",
217+
"21 | 1.732 | Disenchanted | 2023-01-01 00:00:00.0");
175218

176219
assertEqualsInAnyOrder(expected, actual);
177220
}

0 commit comments

Comments
 (0)