Skip to content

Commit 08136fc

Browse files
committed
add timezone from pipeline
1 parent 12f89ab commit 08136fc

File tree

3 files changed

+145
-163
lines changed

3 files changed

+145
-163
lines changed

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java

Lines changed: 52 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -111,167 +111,108 @@ public static TimestampData toTimestamp(String str, String format, String timezo
111111
public static Integer timestampDiff(
112112
String timeIntervalUnit,
113113
LocalZonedTimestampData fromTimestamp,
114-
LocalZonedTimestampData toTimestamp) {
114+
LocalZonedTimestampData toTimestamp,
115+
String timezone) {
115116
if (fromTimestamp == null || toTimestamp == null) {
116117
return null;
117118
}
118119
return timestampDiff(
119120
timeIntervalUnit,
120121
fromTimestamp.getEpochMillisecond(),
121-
toTimestamp.getEpochMillisecond());
122+
timezone,
123+
toTimestamp.getEpochMillisecond(),
124+
timezone);
122125
}
123126

124127
public static Integer timestampdiff(
125128
String timeIntervalUnit,
126129
LocalZonedTimestampData fromTimestamp,
127-
LocalZonedTimestampData toTimestamp) {
128-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
129-
}
130-
131-
public static Integer timestampDiff(
132-
String timeIntervalUnit, TimestampData fromTimestamp, TimestampData toTimestamp) {
133-
if (fromTimestamp == null || toTimestamp == null) {
134-
return null;
135-
}
136-
return timestampDiff(
137-
timeIntervalUnit, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
138-
}
139-
140-
public static Integer timestampdiff(
141-
String timeIntervalUnit, TimestampData fromTimestamp, TimestampData toTimestamp) {
142-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
130+
LocalZonedTimestampData toTimestamp,
131+
String timezone) {
132+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
143133
}
144134

145135
public static Integer timestampDiff(
146136
String timeIntervalUnit,
147137
TimestampData fromTimestamp,
148-
LocalZonedTimestampData toTimestamp) {
138+
TimestampData toTimestamp,
139+
String timezone) {
149140
if (fromTimestamp == null || toTimestamp == null) {
150141
return null;
151142
}
152143
return timestampDiff(
153144
timeIntervalUnit,
154145
fromTimestamp.getMillisecond(),
155-
toTimestamp.getEpochMillisecond());
146+
"UTC",
147+
toTimestamp.getMillisecond(),
148+
"UTC");
156149
}
157150

158151
public static Integer timestampdiff(
159152
String timeIntervalUnit,
160153
TimestampData fromTimestamp,
161-
LocalZonedTimestampData toTimestamp) {
162-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
154+
TimestampData toTimestamp,
155+
String timezone) {
156+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
163157
}
164158

165159
public static Integer timestampDiff(
166160
String timeIntervalUnit,
167-
LocalZonedTimestampData fromTimestamp,
168-
TimestampData toTimestamp) {
161+
TimestampData fromTimestamp,
162+
LocalZonedTimestampData toTimestamp,
163+
String timezone) {
169164
if (fromTimestamp == null || toTimestamp == null) {
170165
return null;
171166
}
172167
return timestampDiff(
173168
timeIntervalUnit,
174-
fromTimestamp.getEpochMillisecond(),
175-
toTimestamp.getMillisecond());
176-
}
177-
178-
public static Integer timestampdiff(
179-
String timeIntervalUnit,
180-
LocalZonedTimestampData fromTimestamp,
181-
TimestampData toTimestamp) {
182-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
183-
}
184-
185-
public static Integer timestampDiff(
186-
String timeIntervalUnit,
187-
ZonedTimestampData fromTimestamp,
188-
ZonedTimestampData toTimestamp) {
189-
if (fromTimestamp == null || toTimestamp == null) {
190-
return null;
191-
}
192-
return timestampDiff(
193-
timeIntervalUnit, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
169+
fromTimestamp.getMillisecond(),
170+
"UTC",
171+
toTimestamp.getEpochMillisecond(),
172+
timezone);
194173
}
195174

196175
public static Integer timestampdiff(
197176
String timeIntervalUnit,
198-
ZonedTimestampData fromTimestamp,
199-
ZonedTimestampData toTimestamp) {
200-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
177+
TimestampData fromTimestamp,
178+
LocalZonedTimestampData toTimestamp,
179+
String timezone) {
180+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
201181
}
202182

203183
public static Integer timestampDiff(
204184
String timeIntervalUnit,
205185
LocalZonedTimestampData fromTimestamp,
206-
ZonedTimestampData toTimestamp) {
186+
TimestampData toTimestamp,
187+
String timezone) {
207188
if (fromTimestamp == null || toTimestamp == null) {
208189
return null;
209190
}
210191
return timestampDiff(
211192
timeIntervalUnit,
212193
fromTimestamp.getEpochMillisecond(),
213-
toTimestamp.getMillisecond());
194+
timezone,
195+
toTimestamp.getMillisecond(),
196+
"UTC");
214197
}
215198

216199
public static Integer timestampdiff(
217200
String timeIntervalUnit,
218201
LocalZonedTimestampData fromTimestamp,
219-
ZonedTimestampData toTimestamp) {
220-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
202+
TimestampData toTimestamp,
203+
String timezone) {
204+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
221205
}
222206

223207
public static Integer timestampDiff(
224208
String timeIntervalUnit,
225-
ZonedTimestampData fromTimestamp,
226-
LocalZonedTimestampData toTimestamp) {
227-
if (fromTimestamp == null || toTimestamp == null) {
228-
return null;
229-
}
230-
return timestampDiff(
231-
timeIntervalUnit,
232-
fromTimestamp.getMillisecond(),
233-
toTimestamp.getEpochMillisecond());
234-
}
235-
236-
public static Integer timestampdiff(
237-
String timeIntervalUnit,
238-
ZonedTimestampData fromTimestamp,
239-
LocalZonedTimestampData toTimestamp) {
240-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
241-
}
242-
243-
public static Integer timestampDiff(
244-
String timeIntervalUnit, TimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
245-
if (fromTimestamp == null || toTimestamp == null) {
246-
return null;
247-
}
248-
return timestampDiff(
249-
timeIntervalUnit, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
250-
}
251-
252-
public static Integer timestampdiff(
253-
String timeIntervalUnit, TimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
254-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
255-
}
256-
257-
public static Integer timestampDiff(
258-
String timeIntervalUnit, ZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
259-
if (fromTimestamp == null || toTimestamp == null) {
260-
return null;
261-
}
262-
return timestampDiff(
263-
timeIntervalUnit, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
264-
}
265-
266-
public static Integer timestampdiff(
267-
String timeIntervalUnit, ZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
268-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
269-
}
270-
271-
public static Integer timestampDiff(String timeIntervalUnit, long fromDate, long toDate) {
272-
Calendar from = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
209+
long fromDate,
210+
String fromTimezone,
211+
long toDate,
212+
String toTimezone) {
213+
Calendar from = Calendar.getInstance(TimeZone.getTimeZone(fromTimezone));
273214
from.setTime(new Date(fromDate));
274-
Calendar to = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
215+
Calendar to = Calendar.getInstance(TimeZone.getTimeZone(toTimezone));
275216
to.setTime(new Date(toDate));
276217
long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
277218
switch (timeIntervalUnit) {
@@ -310,37 +251,31 @@ public static Integer timestampDiff(String timeIntervalUnit, long fromDate, long
310251
}
311252

312253
public static LocalZonedTimestampData timestampadd(
313-
String timeIntervalUnit, Integer interval, LocalZonedTimestampData timePoint) {
254+
String timeIntervalUnit,
255+
Integer interval,
256+
LocalZonedTimestampData timePoint,
257+
String timezone) {
314258
if (interval == null || timePoint == null) {
315259
return null;
316260
}
317261
return LocalZonedTimestampData.fromEpochMillis(
318-
timestampadd(timeIntervalUnit, interval, timePoint.getEpochMillisecond()));
319-
}
320-
321-
public static ZonedTimestampData timestampadd(
322-
String timeIntervalUnit, Integer interval, ZonedTimestampData timePoint) {
323-
if (interval == null || timePoint == null) {
324-
return null;
325-
}
326-
return ZonedTimestampData.of(
327-
timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond()),
328-
0,
329-
timePoint.getZoneId());
262+
timestampadd(
263+
timeIntervalUnit, interval, timePoint.getEpochMillisecond(), timezone));
330264
}
331265

332266
public static TimestampData timestampadd(
333-
String timeIntervalUnit, Integer interval, TimestampData timePoint) {
267+
String timeIntervalUnit, Integer interval, TimestampData timePoint, String timezone) {
334268
if (interval == null || timePoint == null) {
335269
return null;
336270
}
337271
return TimestampData.fromMillis(
338-
timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond()));
272+
timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond(), "UTC"));
339273
}
340274

341-
private static long timestampadd(String timeIntervalUnit, int interval, long timePoint) {
275+
private static long timestampadd(
276+
String timeIntervalUnit, int interval, long timePoint, String timezone) {
342277
Calendar calendar = Calendar.getInstance();
343-
calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
278+
calendar.setTimeZone(TimeZone.getTimeZone(timezone));
344279
calendar.setTime(new Date(timePoint));
345280
int field;
346281
switch (timeIntervalUnit) {

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public class JaninoCompiler {
6363
Arrays.asList("DATE_FORMAT");
6464

6565
private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
66-
Arrays.asList("TO_DATE", "TO_TIMESTAMP");
66+
Arrays.asList(
67+
"TO_DATE", "TO_TIMESTAMP", "TIMESTAMPADD", "TIMESTAMPDIFF", "TIMESTAMP_DIFF");
6768

6869
public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
6970
public static final String DEFAULT_TIME_ZONE = "__time_zone__";
@@ -118,13 +119,14 @@ public static Java.Rvalue translateSqlNodeToJaninoRvalue(
118119

119120
private static Java.Rvalue translateSqlIdentifier(SqlIdentifier sqlIdentifier) {
120121
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
121-
if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName)) {
122+
if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) {
122123
return generateTimezoneFreeTemporalFunctionOperation(columnName);
123-
} else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName)) {
124+
} else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) {
124125
return generateTimezoneRequiredTemporalFunctionOperation(columnName);
125-
} else if (TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) {
126+
} else if (TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName.toUpperCase())) {
126127
return generateTimezoneFreeTemporalConversionFunctionOperation(columnName);
127-
} else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) {
128+
} else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(
129+
columnName.toUpperCase())) {
128130
return generateTimezoneRequiredTemporalConversionFunctionOperation(columnName);
129131
} else {
130132
return new Java.AmbiguousName(Location.NOWHERE, new String[] {columnName});
@@ -160,14 +162,15 @@ private static Java.Rvalue translateSqlBasicCall(
160162
for (SqlNode sqlNode : operandList) {
161163
translateSqlNodeToAtoms(sqlNode, atoms, udfDescriptors);
162164
}
163-
if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(sqlBasicCall.getOperator().getName())) {
165+
if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(
166+
sqlBasicCall.getOperator().getName().toUpperCase())) {
164167
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME}));
165168
} else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(
166-
sqlBasicCall.getOperator().getName())) {
169+
sqlBasicCall.getOperator().getName().toUpperCase())) {
167170
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME}));
168171
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE}));
169172
} else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(
170-
sqlBasicCall.getOperator().getName())) {
173+
sqlBasicCall.getOperator().getName().toUpperCase())) {
171174
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE}));
172175
}
173176
return sqlBasicCallToJaninoRvalue(
@@ -314,7 +317,7 @@ private static Java.Rvalue generateCastOperation(
314317

315318
private static Java.Rvalue generateTimestampDiffOperation(
316319
SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
317-
if (atoms.length != 3) {
320+
if (atoms.length != 4) {
318321
throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString());
319322
}
320323
String timeIntervalUnit = atoms[0].toString().toUpperCase();
@@ -336,6 +339,7 @@ private static Java.Rvalue generateTimestampDiffOperation(
336339
new Java.AmbiguousName(Location.NOWHERE, new String[] {timeIntervalUnit}));
337340
timestampDiffFunctionParam.add(atoms[1]);
338341
timestampDiffFunctionParam.add(atoms[2]);
342+
timestampDiffFunctionParam.add(atoms[3]);
339343
return new Java.MethodInvocation(
340344
Location.NOWHERE,
341345
null,
@@ -345,7 +349,7 @@ private static Java.Rvalue generateTimestampDiffOperation(
345349

346350
private static Java.Rvalue generateTimestampAddOperation(
347351
SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
348-
if (atoms.length != 3) {
352+
if (atoms.length != 4) {
349353
throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString());
350354
}
351355
String timeIntervalUnit = atoms[0].toString().toUpperCase();
@@ -367,6 +371,7 @@ private static Java.Rvalue generateTimestampAddOperation(
367371
new Java.AmbiguousName(Location.NOWHERE, new String[] {timeIntervalUnit}));
368372
timestampDiffFunctionParam.add(atoms[1]);
369373
timestampDiffFunctionParam.add(atoms[2]);
374+
timestampDiffFunctionParam.add(atoms[3]);
370375
return new Java.MethodInvocation(
371376
Location.NOWHERE,
372377
null,

0 commit comments

Comments
 (0)