Skip to content

Commit 7acdc1a

Browse files
committed
Rebase master and optimize code
1 parent 35626af commit 7acdc1a

File tree

4 files changed

+130
-111
lines changed

4 files changed

+130
-111
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.time.Instant;
2626
import java.time.ZoneId;
2727
import java.time.ZonedDateTime;
28+
import java.util.Calendar;
2829
import java.util.Date;
2930
import java.util.TimeZone;
3031

@@ -198,4 +199,93 @@ public static String formatTimestampMillis(long ts, String format, TimeZone time
198199
Date dateTime = new Date(ts);
199200
return formatter.format(dateTime);
200201
}
202+
203+
// --------------------------------------------------------------------------------------------
204+
// Compare
205+
// --------------------------------------------------------------------------------------------
206+
207+
public static Integer timestampDiff(
208+
String timeIntervalUnit,
209+
long fromDate,
210+
String fromTimezone,
211+
long toDate,
212+
String toTimezone) {
213+
Calendar from = Calendar.getInstance(TimeZone.getTimeZone(fromTimezone));
214+
from.setTime(new Date(fromDate));
215+
Calendar to = Calendar.getInstance(TimeZone.getTimeZone(toTimezone));
216+
to.setTime(new Date(toDate));
217+
long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
218+
switch (timeIntervalUnit) {
219+
case "SECOND":
220+
if (second > Integer.MAX_VALUE) {
221+
return null;
222+
}
223+
return (int) second;
224+
case "MINUTE":
225+
if (second > Integer.MAX_VALUE) {
226+
return null;
227+
}
228+
return (int) second / 60;
229+
case "HOUR":
230+
if (second > Integer.MAX_VALUE) {
231+
return null;
232+
}
233+
return (int) second / 3600;
234+
case "DAY":
235+
if (second > Integer.MAX_VALUE) {
236+
return null;
237+
}
238+
return (int) second / (24 * 3600);
239+
case "MONTH":
240+
return to.get(Calendar.YEAR) * 12
241+
+ to.get(Calendar.MONTH)
242+
- (from.get(Calendar.YEAR) * 12 + from.get(Calendar.MONTH));
243+
case "YEAR":
244+
return to.get(Calendar.YEAR) - from.get(Calendar.YEAR);
245+
default:
246+
throw new RuntimeException(
247+
String.format(
248+
"Unsupported timestamp interval unit %s. Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR",
249+
timeIntervalUnit));
250+
}
251+
}
252+
253+
// --------------------------------------------------------------------------------------------
254+
// Add
255+
// --------------------------------------------------------------------------------------------
256+
257+
public static long timestampAdd(
258+
String timeIntervalUnit, int interval, long timePoint, String timezone) {
259+
Calendar calendar = Calendar.getInstance();
260+
calendar.setTimeZone(TimeZone.getTimeZone(timezone));
261+
calendar.setTime(new Date(timePoint));
262+
int field;
263+
switch (timeIntervalUnit) {
264+
case "SECOND":
265+
field = Calendar.SECOND;
266+
break;
267+
case "MINUTE":
268+
field = Calendar.MINUTE;
269+
break;
270+
case "HOUR":
271+
field = Calendar.HOUR;
272+
break;
273+
case "DAY":
274+
field = Calendar.DATE;
275+
break;
276+
case "MONTH":
277+
field = Calendar.MONTH;
278+
break;
279+
case "YEAR":
280+
field = Calendar.YEAR;
281+
break;
282+
default:
283+
throw new RuntimeException(
284+
String.format(
285+
"Unsupported timestamp interval unit %s. Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR",
286+
timeIntervalUnit));
287+
}
288+
calendar.add(field, interval);
289+
return calendar.getTimeInMillis();
290+
}
201291
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java

Lines changed: 32 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import java.time.LocalDateTime;
3535
import java.time.ZoneId;
3636
import java.util.Arrays;
37-
import java.util.Calendar;
38-
import java.util.Date;
3937
import java.util.TimeZone;
4038
import java.util.UUID;
4139
import java.util.regex.Matcher;
@@ -129,6 +127,7 @@ public static TimestampData toTimestamp(String str, String format, String timezo
129127
}
130128
}
131129

130+
// Be compatible with the existing definition of Function TIMESTAMP_DIFF
132131
public static Integer timestampDiff(
133132
String timeIntervalUnit,
134133
LocalZonedTimestampData fromTimestamp,
@@ -137,22 +136,15 @@ public static Integer timestampDiff(
137136
if (fromTimestamp == null || toTimestamp == null) {
138137
return null;
139138
}
140-
return timestampDiff(
139+
return DateTimeUtils.timestampDiff(
141140
timeIntervalUnit,
142141
fromTimestamp.getEpochMillisecond(),
143142
timezone,
144143
toTimestamp.getEpochMillisecond(),
145144
timezone);
146145
}
147146

148-
public static Integer timestampdiff(
149-
String timeIntervalUnit,
150-
LocalZonedTimestampData fromTimestamp,
151-
LocalZonedTimestampData toTimestamp,
152-
String timezone) {
153-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
154-
}
155-
147+
// Be compatible with the existing definition of Function TIMESTAMP_DIFF
156148
public static Integer timestampDiff(
157149
String timeIntervalUnit,
158150
TimestampData fromTimestamp,
@@ -161,22 +153,15 @@ public static Integer timestampDiff(
161153
if (fromTimestamp == null || toTimestamp == null) {
162154
return null;
163155
}
164-
return timestampDiff(
156+
return DateTimeUtils.timestampDiff(
165157
timeIntervalUnit,
166158
fromTimestamp.getMillisecond(),
167159
"UTC",
168160
toTimestamp.getMillisecond(),
169161
"UTC");
170162
}
171163

172-
public static Integer timestampdiff(
173-
String timeIntervalUnit,
174-
TimestampData fromTimestamp,
175-
TimestampData toTimestamp,
176-
String timezone) {
177-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
178-
}
179-
164+
// Be compatible with the existing definition of Function TIMESTAMP_DIFF
180165
public static Integer timestampDiff(
181166
String timeIntervalUnit,
182167
TimestampData fromTimestamp,
@@ -185,22 +170,15 @@ public static Integer timestampDiff(
185170
if (fromTimestamp == null || toTimestamp == null) {
186171
return null;
187172
}
188-
return timestampDiff(
173+
return DateTimeUtils.timestampDiff(
189174
timeIntervalUnit,
190175
fromTimestamp.getMillisecond(),
191176
"UTC",
192177
toTimestamp.getEpochMillisecond(),
193178
timezone);
194179
}
195180

196-
public static Integer timestampdiff(
197-
String timeIntervalUnit,
198-
TimestampData fromTimestamp,
199-
LocalZonedTimestampData toTimestamp,
200-
String timezone) {
201-
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
202-
}
203-
181+
// Be compatible with the existing definition of Function TIMESTAMP_DIFF
204182
public static Integer timestampDiff(
205183
String timeIntervalUnit,
206184
LocalZonedTimestampData fromTimestamp,
@@ -209,7 +187,7 @@ public static Integer timestampDiff(
209187
if (fromTimestamp == null || toTimestamp == null) {
210188
return null;
211189
}
212-
return timestampDiff(
190+
return DateTimeUtils.timestampDiff(
213191
timeIntervalUnit,
214192
fromTimestamp.getEpochMillisecond(),
215193
timezone,
@@ -220,55 +198,33 @@ public static Integer timestampDiff(
220198
public static Integer timestampdiff(
221199
String timeIntervalUnit,
222200
LocalZonedTimestampData fromTimestamp,
201+
LocalZonedTimestampData toTimestamp,
202+
String timezone) {
203+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
204+
}
205+
206+
public static Integer timestampdiff(
207+
String timeIntervalUnit,
208+
TimestampData fromTimestamp,
223209
TimestampData toTimestamp,
224210
String timezone) {
225211
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
226212
}
227213

228-
public static Integer timestampDiff(
214+
public static Integer timestampdiff(
229215
String timeIntervalUnit,
230-
long fromDate,
231-
String fromTimezone,
232-
long toDate,
233-
String toTimezone) {
234-
Calendar from = Calendar.getInstance(TimeZone.getTimeZone(fromTimezone));
235-
from.setTime(new Date(fromDate));
236-
Calendar to = Calendar.getInstance(TimeZone.getTimeZone(toTimezone));
237-
to.setTime(new Date(toDate));
238-
long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
239-
switch (timeIntervalUnit) {
240-
case "SECOND":
241-
if (second > Integer.MAX_VALUE) {
242-
return null;
243-
}
244-
return (int) second;
245-
case "MINUTE":
246-
if (second > Integer.MAX_VALUE) {
247-
return null;
248-
}
249-
return (int) second / 60;
250-
case "HOUR":
251-
if (second > Integer.MAX_VALUE) {
252-
return null;
253-
}
254-
return (int) second / 3600;
255-
case "DAY":
256-
if (second > Integer.MAX_VALUE) {
257-
return null;
258-
}
259-
return (int) second / (24 * 3600);
260-
case "MONTH":
261-
return to.get(Calendar.YEAR) * 12
262-
+ to.get(Calendar.MONTH)
263-
- (from.get(Calendar.YEAR) * 12 + from.get(Calendar.MONTH));
264-
case "YEAR":
265-
return to.get(Calendar.YEAR) - from.get(Calendar.YEAR);
266-
default:
267-
throw new RuntimeException(
268-
String.format(
269-
"Unsupported timestamp interval unit %s. Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR",
270-
timeIntervalUnit));
271-
}
216+
TimestampData fromTimestamp,
217+
LocalZonedTimestampData toTimestamp,
218+
String timezone) {
219+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
220+
}
221+
222+
public static Integer timestampdiff(
223+
String timeIntervalUnit,
224+
LocalZonedTimestampData fromTimestamp,
225+
TimestampData toTimestamp,
226+
String timezone) {
227+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
272228
}
273229

274230
public static LocalZonedTimestampData timestampadd(
@@ -280,7 +236,7 @@ public static LocalZonedTimestampData timestampadd(
280236
return null;
281237
}
282238
return LocalZonedTimestampData.fromEpochMillis(
283-
timestampadd(
239+
DateTimeUtils.timestampAdd(
284240
timeIntervalUnit, interval, timePoint.getEpochMillisecond(), timezone));
285241
}
286242

@@ -290,42 +246,8 @@ public static TimestampData timestampadd(
290246
return null;
291247
}
292248
return TimestampData.fromMillis(
293-
timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond(), "UTC"));
294-
}
295-
296-
private static long timestampadd(
297-
String timeIntervalUnit, int interval, long timePoint, String timezone) {
298-
Calendar calendar = Calendar.getInstance();
299-
calendar.setTimeZone(TimeZone.getTimeZone(timezone));
300-
calendar.setTime(new Date(timePoint));
301-
int field;
302-
switch (timeIntervalUnit) {
303-
case "SECOND":
304-
field = Calendar.SECOND;
305-
break;
306-
case "MINUTE":
307-
field = Calendar.MINUTE;
308-
break;
309-
case "HOUR":
310-
field = Calendar.HOUR;
311-
break;
312-
case "DAY":
313-
field = Calendar.DAY_OF_YEAR;
314-
break;
315-
case "MONTH":
316-
field = Calendar.MONTH;
317-
break;
318-
case "YEAR":
319-
field = Calendar.YEAR;
320-
break;
321-
default:
322-
throw new RuntimeException(
323-
String.format(
324-
"Unsupported timestamp interval unit %s. Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR",
325-
timeIntervalUnit));
326-
}
327-
calendar.add(field, interval);
328-
return calendar.getTimeInMillis();
249+
DateTimeUtils.timestampAdd(
250+
timeIntervalUnit, interval, timePoint.getMillisecond(), "UTC"));
329251
}
330252

331253
public static boolean betweenAsymmetric(String value, String minValue, String maxValue) {

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,13 @@ public class JaninoCompiler {
6868
Arrays.asList("DATE_FORMAT");
6969

7070
private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
71-
Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME", "TIMESTAMPADD", "TIMESTAMPDIFF");
71+
Arrays.asList(
72+
"TO_DATE",
73+
"TO_TIMESTAMP",
74+
"FROM_UNIXTIME",
75+
"TIMESTAMPADD",
76+
"TIMESTAMPDIFF",
77+
"TIMESTAMP_DIFF");
7278

7379
public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
7480
public static final String DEFAULT_TIME_ZONE = "__time_zone__";

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public static RelDataType convertCalciteType(
133133
case TIMESTAMP_WITHOUT_TIME_ZONE:
134134
return typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
135135
case TIMESTAMP_WITH_TIME_ZONE:
136+
// TODO: Support TIMESTAMP_TZ #FLINK-37123
136137
throw new UnsupportedOperationException("Unsupported type: TIMESTAMP_TZ");
137138
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
138139
return typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);

0 commit comments

Comments
 (0)