Skip to content

Commit 94a9639

Browse files
committed
add timezone from pipeline
1 parent 2ae3799 commit 94a9639

File tree

3 files changed

+144
-163
lines changed

3 files changed

+144
-163
lines changed

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java

Lines changed: 52 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -132,167 +132,108 @@ public static TimestampData toTimestamp(String str, String format, String timezo
132132
public static Integer timestampDiff(
133133
String timeIntervalUnit,
134134
LocalZonedTimestampData fromTimestamp,
135-
LocalZonedTimestampData toTimestamp) {
135+
LocalZonedTimestampData toTimestamp,
136+
String timezone) {
136137
if (fromTimestamp == null || toTimestamp == null) {
137138
return null;
138139
}
139140
return timestampDiff(
140141
timeIntervalUnit,
141142
fromTimestamp.getEpochMillisecond(),
142-
toTimestamp.getEpochMillisecond());
143+
timezone,
144+
toTimestamp.getEpochMillisecond(),
145+
timezone);
143146
}
144147

145148
public static Integer timestampdiff(
146149
String timeIntervalUnit,
147150
LocalZonedTimestampData fromTimestamp,
148-
LocalZonedTimestampData toTimestamp) {
149-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
150-
}
151-
152-
public static Integer timestampDiff(
153-
String timeIntervalUnit, TimestampData fromTimestamp, TimestampData toTimestamp) {
154-
if (fromTimestamp == null || toTimestamp == null) {
155-
return null;
156-
}
157-
return timestampDiff(
158-
timeIntervalUnit, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
159-
}
160-
161-
public static Integer timestampdiff(
162-
String timeIntervalUnit, TimestampData fromTimestamp, TimestampData toTimestamp) {
163-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
151+
LocalZonedTimestampData toTimestamp,
152+
String timezone) {
153+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
164154
}
165155

166156
public static Integer timestampDiff(
167157
String timeIntervalUnit,
168158
TimestampData fromTimestamp,
169-
LocalZonedTimestampData toTimestamp) {
159+
TimestampData toTimestamp,
160+
String timezone) {
170161
if (fromTimestamp == null || toTimestamp == null) {
171162
return null;
172163
}
173164
return timestampDiff(
174165
timeIntervalUnit,
175166
fromTimestamp.getMillisecond(),
176-
toTimestamp.getEpochMillisecond());
167+
"UTC",
168+
toTimestamp.getMillisecond(),
169+
"UTC");
177170
}
178171

179172
public static Integer timestampdiff(
180173
String timeIntervalUnit,
181174
TimestampData fromTimestamp,
182-
LocalZonedTimestampData toTimestamp) {
183-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
175+
TimestampData toTimestamp,
176+
String timezone) {
177+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
184178
}
185179

186180
public static Integer timestampDiff(
187181
String timeIntervalUnit,
188-
LocalZonedTimestampData fromTimestamp,
189-
TimestampData toTimestamp) {
182+
TimestampData fromTimestamp,
183+
LocalZonedTimestampData toTimestamp,
184+
String timezone) {
190185
if (fromTimestamp == null || toTimestamp == null) {
191186
return null;
192187
}
193188
return timestampDiff(
194189
timeIntervalUnit,
195-
fromTimestamp.getEpochMillisecond(),
196-
toTimestamp.getMillisecond());
197-
}
198-
199-
public static Integer timestampdiff(
200-
String timeIntervalUnit,
201-
LocalZonedTimestampData fromTimestamp,
202-
TimestampData toTimestamp) {
203-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
204-
}
205-
206-
public static Integer timestampDiff(
207-
String timeIntervalUnit,
208-
ZonedTimestampData fromTimestamp,
209-
ZonedTimestampData toTimestamp) {
210-
if (fromTimestamp == null || toTimestamp == null) {
211-
return null;
212-
}
213-
return timestampDiff(
214-
timeIntervalUnit, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
190+
fromTimestamp.getMillisecond(),
191+
"UTC",
192+
toTimestamp.getEpochMillisecond(),
193+
timezone);
215194
}
216195

217196
public static Integer timestampdiff(
218197
String timeIntervalUnit,
219-
ZonedTimestampData fromTimestamp,
220-
ZonedTimestampData toTimestamp) {
221-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
198+
TimestampData fromTimestamp,
199+
LocalZonedTimestampData toTimestamp,
200+
String timezone) {
201+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
222202
}
223203

224204
public static Integer timestampDiff(
225205
String timeIntervalUnit,
226206
LocalZonedTimestampData fromTimestamp,
227-
ZonedTimestampData toTimestamp) {
207+
TimestampData toTimestamp,
208+
String timezone) {
228209
if (fromTimestamp == null || toTimestamp == null) {
229210
return null;
230211
}
231212
return timestampDiff(
232213
timeIntervalUnit,
233214
fromTimestamp.getEpochMillisecond(),
234-
toTimestamp.getMillisecond());
215+
timezone,
216+
toTimestamp.getMillisecond(),
217+
"UTC");
235218
}
236219

237220
public static Integer timestampdiff(
238221
String timeIntervalUnit,
239222
LocalZonedTimestampData fromTimestamp,
240-
ZonedTimestampData toTimestamp) {
241-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
223+
TimestampData toTimestamp,
224+
String timezone) {
225+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
242226
}
243227

244228
public static Integer timestampDiff(
245229
String timeIntervalUnit,
246-
ZonedTimestampData fromTimestamp,
247-
LocalZonedTimestampData toTimestamp) {
248-
if (fromTimestamp == null || toTimestamp == null) {
249-
return null;
250-
}
251-
return timestampDiff(
252-
timeIntervalUnit,
253-
fromTimestamp.getMillisecond(),
254-
toTimestamp.getEpochMillisecond());
255-
}
256-
257-
public static Integer timestampdiff(
258-
String timeIntervalUnit,
259-
ZonedTimestampData fromTimestamp,
260-
LocalZonedTimestampData toTimestamp) {
261-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
262-
}
263-
264-
public static Integer timestampDiff(
265-
String timeIntervalUnit, TimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
266-
if (fromTimestamp == null || toTimestamp == null) {
267-
return null;
268-
}
269-
return timestampDiff(
270-
timeIntervalUnit, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
271-
}
272-
273-
public static Integer timestampdiff(
274-
String timeIntervalUnit, TimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
275-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
276-
}
277-
278-
public static Integer timestampDiff(
279-
String timeIntervalUnit, ZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
280-
if (fromTimestamp == null || toTimestamp == null) {
281-
return null;
282-
}
283-
return timestampDiff(
284-
timeIntervalUnit, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
285-
}
286-
287-
public static Integer timestampdiff(
288-
String timeIntervalUnit, ZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
289-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
290-
}
291-
292-
public static Integer timestampDiff(String timeIntervalUnit, long fromDate, long toDate) {
293-
Calendar from = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
230+
long fromDate,
231+
String fromTimezone,
232+
long toDate,
233+
String toTimezone) {
234+
Calendar from = Calendar.getInstance(TimeZone.getTimeZone(fromTimezone));
294235
from.setTime(new Date(fromDate));
295-
Calendar to = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
236+
Calendar to = Calendar.getInstance(TimeZone.getTimeZone(toTimezone));
296237
to.setTime(new Date(toDate));
297238
long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
298239
switch (timeIntervalUnit) {
@@ -331,37 +272,31 @@ public static Integer timestampDiff(String timeIntervalUnit, long fromDate, long
331272
}
332273

333274
public static LocalZonedTimestampData timestampadd(
334-
String timeIntervalUnit, Integer interval, LocalZonedTimestampData timePoint) {
275+
String timeIntervalUnit,
276+
Integer interval,
277+
LocalZonedTimestampData timePoint,
278+
String timezone) {
335279
if (interval == null || timePoint == null) {
336280
return null;
337281
}
338282
return LocalZonedTimestampData.fromEpochMillis(
339-
timestampadd(timeIntervalUnit, interval, timePoint.getEpochMillisecond()));
340-
}
341-
342-
public static ZonedTimestampData timestampadd(
343-
String timeIntervalUnit, Integer interval, ZonedTimestampData timePoint) {
344-
if (interval == null || timePoint == null) {
345-
return null;
346-
}
347-
return ZonedTimestampData.of(
348-
timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond()),
349-
0,
350-
timePoint.getZoneId());
283+
timestampadd(
284+
timeIntervalUnit, interval, timePoint.getEpochMillisecond(), timezone));
351285
}
352286

353287
public static TimestampData timestampadd(
354-
String timeIntervalUnit, Integer interval, TimestampData timePoint) {
288+
String timeIntervalUnit, Integer interval, TimestampData timePoint, String timezone) {
355289
if (interval == null || timePoint == null) {
356290
return null;
357291
}
358292
return TimestampData.fromMillis(
359-
timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond()));
293+
timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond(), "UTC"));
360294
}
361295

362-
private static long timestampadd(String timeIntervalUnit, int interval, long timePoint) {
296+
private static long timestampadd(
297+
String timeIntervalUnit, int interval, long timePoint, String timezone) {
363298
Calendar calendar = Calendar.getInstance();
364-
calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
299+
calendar.setTimeZone(TimeZone.getTimeZone(timezone));
365300
calendar.setTime(new Date(timePoint));
366301
int field;
367302
switch (timeIntervalUnit) {

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public class JaninoCompiler {
6868
Arrays.asList("DATE_FORMAT");
6969

7070
private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
71-
Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME");
71+
Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME", "TIMESTAMPADD", "TIMESTAMPDIFF");
7272

7373
public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
7474
public static final String DEFAULT_TIME_ZONE = "__time_zone__";
@@ -123,13 +123,14 @@ public static Java.Rvalue translateSqlNodeToJaninoRvalue(
123123

124124
private static Java.Rvalue translateSqlIdentifier(SqlIdentifier sqlIdentifier) {
125125
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
126-
if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName)) {
126+
if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) {
127127
return generateTimezoneFreeTemporalFunctionOperation(columnName);
128-
} else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName)) {
128+
} else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) {
129129
return generateTimezoneRequiredTemporalFunctionOperation(columnName);
130-
} else if (TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) {
130+
} else if (TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName.toUpperCase())) {
131131
return generateTimezoneFreeTemporalConversionFunctionOperation(columnName);
132-
} else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) {
132+
} else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(
133+
columnName.toUpperCase())) {
133134
return generateTimezoneRequiredTemporalConversionFunctionOperation(columnName);
134135
} else {
135136
return new Java.AmbiguousName(Location.NOWHERE, new String[] {columnName});
@@ -165,14 +166,15 @@ private static Java.Rvalue translateSqlBasicCall(
165166
for (SqlNode sqlNode : operandList) {
166167
translateSqlNodeToAtoms(sqlNode, atoms, udfDescriptors);
167168
}
168-
if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(sqlBasicCall.getOperator().getName())) {
169+
if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(
170+
sqlBasicCall.getOperator().getName().toUpperCase())) {
169171
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME}));
170172
} else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(
171-
sqlBasicCall.getOperator().getName())) {
173+
sqlBasicCall.getOperator().getName().toUpperCase())) {
172174
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME}));
173175
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE}));
174176
} else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(
175-
sqlBasicCall.getOperator().getName())) {
177+
sqlBasicCall.getOperator().getName().toUpperCase())) {
176178
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE}));
177179
}
178180
return sqlBasicCallToJaninoRvalue(
@@ -319,7 +321,7 @@ private static Java.Rvalue generateCastOperation(
319321

320322
private static Java.Rvalue generateTimestampDiffOperation(
321323
SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
322-
if (atoms.length != 3) {
324+
if (atoms.length != 4) {
323325
throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString());
324326
}
325327
String timeIntervalUnit = atoms[0].toString().toUpperCase();
@@ -341,6 +343,7 @@ private static Java.Rvalue generateTimestampDiffOperation(
341343
new Java.AmbiguousName(Location.NOWHERE, new String[] {timeIntervalUnit}));
342344
timestampDiffFunctionParam.add(atoms[1]);
343345
timestampDiffFunctionParam.add(atoms[2]);
346+
timestampDiffFunctionParam.add(atoms[3]);
344347
return new Java.MethodInvocation(
345348
Location.NOWHERE,
346349
null,
@@ -350,7 +353,7 @@ private static Java.Rvalue generateTimestampDiffOperation(
350353

351354
private static Java.Rvalue generateTimestampAddOperation(
352355
SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
353-
if (atoms.length != 3) {
356+
if (atoms.length != 4) {
354357
throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString());
355358
}
356359
String timeIntervalUnit = atoms[0].toString().toUpperCase();
@@ -372,6 +375,7 @@ private static Java.Rvalue generateTimestampAddOperation(
372375
new Java.AmbiguousName(Location.NOWHERE, new String[] {timeIntervalUnit}));
373376
timestampDiffFunctionParam.add(atoms[1]);
374377
timestampDiffFunctionParam.add(atoms[2]);
378+
timestampDiffFunctionParam.add(atoms[3]);
375379
return new Java.MethodInvocation(
376380
Location.NOWHERE,
377381
null,

0 commit comments

Comments
 (0)