Skip to content

Commit d77cf7f

Browse files
aiwenmoleonardBang
authored andcommitted
[FLINK-36647][transform] Support Timestampdiff and Timestampadd function in cdc pipeline transform
This closes #3698
1 parent 79e868b commit d77cf7f

File tree

11 files changed

+1233
-100
lines changed

11 files changed

+1233
-100
lines changed

docs/content.zh/docs/core-concept/transform.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
157157
| CURRENT_TIMESTAMP | currentTimestamp() | Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). |
158158
| NOW() | now() | Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP. |
159159
| DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's SimpleDateFormat. |
160+
| TIMESTAMPADD(timeintervalunit, interval, timepoint) | timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of timepoint2 after timepoint added interval. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
160161
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
161162
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
162163
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |

docs/content/docs/core-concept/transform.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
157157
| CURRENT_TIMESTAMP | currentTimestamp() | Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). |
158158
| NOW() | now() | Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP. |
159159
| DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's SimpleDateFormat. |
160+
| TIMESTAMPADD(timeintervalunit, interval, timepoint) | timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of timepoint2 after timepoint added interval. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
160161
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
161162
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
162163
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.time.Instant;
2626
import java.time.ZoneId;
2727
import java.time.ZonedDateTime;
28+
import java.util.Calendar;
2829
import java.util.Date;
2930
import java.util.TimeZone;
3031

@@ -198,4 +199,93 @@ public static String formatTimestampMillis(long ts, String format, TimeZone time
198199
Date dateTime = new Date(ts);
199200
return formatter.format(dateTime);
200201
}
202+
203+
// --------------------------------------------------------------------------------------------
204+
// Compare
205+
// --------------------------------------------------------------------------------------------
206+
207+
public static Integer timestampDiff(
208+
String timeIntervalUnit,
209+
long fromDate,
210+
String fromTimezone,
211+
long toDate,
212+
String toTimezone) {
213+
Calendar from = Calendar.getInstance(TimeZone.getTimeZone(fromTimezone));
214+
from.setTime(new Date(fromDate));
215+
Calendar to = Calendar.getInstance(TimeZone.getTimeZone(toTimezone));
216+
to.setTime(new Date(toDate));
217+
long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
218+
switch (timeIntervalUnit) {
219+
case "SECOND":
220+
if (second > Integer.MAX_VALUE) {
221+
return null;
222+
}
223+
return (int) second;
224+
case "MINUTE":
225+
if (second > Integer.MAX_VALUE) {
226+
return null;
227+
}
228+
return (int) second / 60;
229+
case "HOUR":
230+
if (second > Integer.MAX_VALUE) {
231+
return null;
232+
}
233+
return (int) second / 3600;
234+
case "DAY":
235+
if (second > Integer.MAX_VALUE) {
236+
return null;
237+
}
238+
return (int) second / (24 * 3600);
239+
case "MONTH":
240+
return to.get(Calendar.YEAR) * 12
241+
+ to.get(Calendar.MONTH)
242+
- (from.get(Calendar.YEAR) * 12 + from.get(Calendar.MONTH));
243+
case "YEAR":
244+
return to.get(Calendar.YEAR) - from.get(Calendar.YEAR);
245+
default:
246+
throw new RuntimeException(
247+
String.format(
248+
"Unsupported timestamp interval unit %s. Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR",
249+
timeIntervalUnit));
250+
}
251+
}
252+
253+
// --------------------------------------------------------------------------------------------
254+
// Add
255+
// --------------------------------------------------------------------------------------------
256+
257+
public static long timestampAdd(
258+
String timeIntervalUnit, int interval, long timePoint, String timezone) {
259+
Calendar calendar = Calendar.getInstance();
260+
calendar.setTimeZone(TimeZone.getTimeZone(timezone));
261+
calendar.setTime(new Date(timePoint));
262+
int field;
263+
switch (timeIntervalUnit) {
264+
case "SECOND":
265+
field = Calendar.SECOND;
266+
break;
267+
case "MINUTE":
268+
field = Calendar.MINUTE;
269+
break;
270+
case "HOUR":
271+
field = Calendar.HOUR;
272+
break;
273+
case "DAY":
274+
field = Calendar.DATE;
275+
break;
276+
case "MONTH":
277+
field = Calendar.MONTH;
278+
break;
279+
case "YEAR":
280+
field = Calendar.YEAR;
281+
break;
282+
default:
283+
throw new RuntimeException(
284+
String.format(
285+
"Unsupported timestamp interval unit %s. Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR",
286+
timeIntervalUnit));
287+
}
288+
calendar.add(field, interval);
289+
return calendar.getTimeInMillis();
290+
}
201291
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java

Lines changed: 105 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
import java.time.LocalDateTime;
3636
import java.time.ZoneId;
3737
import java.util.Arrays;
38-
import java.util.Calendar;
39-
import java.util.Date;
4038
import java.util.TimeZone;
4139
import java.util.UUID;
4240
import java.util.regex.Matcher;
@@ -130,83 +128,127 @@ public static TimestampData toTimestamp(String str, String format, String timezo
130128
}
131129
}
132130

133-
public static int timestampDiff(
134-
String symbol,
131+
// Be compatible with the existing definition of Function TIMESTAMP_DIFF
132+
public static Integer timestampDiff(
133+
String timeIntervalUnit,
135134
LocalZonedTimestampData fromTimestamp,
136-
LocalZonedTimestampData toTimestamp) {
137-
return timestampDiff(
138-
symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getEpochMillisecond());
139-
}
140-
141-
public static int timestampDiff(
142-
String symbol, TimestampData fromTimestamp, TimestampData toTimestamp) {
143-
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
144-
}
145-
146-
public static int timestampDiff(
147-
String symbol, TimestampData fromTimestamp, LocalZonedTimestampData toTimestamp) {
148-
return timestampDiff(
149-
symbol, fromTimestamp.getMillisecond(), toTimestamp.getEpochMillisecond());
135+
LocalZonedTimestampData toTimestamp,
136+
String timezone) {
137+
if (fromTimestamp == null || toTimestamp == null) {
138+
return null;
139+
}
140+
return DateTimeUtils.timestampDiff(
141+
timeIntervalUnit,
142+
fromTimestamp.getEpochMillisecond(),
143+
timezone,
144+
toTimestamp.getEpochMillisecond(),
145+
timezone);
146+
}
147+
148+
// Be compatible with the existing definition of Function TIMESTAMP_DIFF
149+
public static Integer timestampDiff(
150+
String timeIntervalUnit,
151+
TimestampData fromTimestamp,
152+
TimestampData toTimestamp,
153+
String timezone) {
154+
if (fromTimestamp == null || toTimestamp == null) {
155+
return null;
156+
}
157+
return DateTimeUtils.timestampDiff(
158+
timeIntervalUnit,
159+
fromTimestamp.getMillisecond(),
160+
"UTC",
161+
toTimestamp.getMillisecond(),
162+
"UTC");
163+
}
164+
165+
// Be compatible with the existing definition of Function TIMESTAMP_DIFF
166+
public static Integer timestampDiff(
167+
String timeIntervalUnit,
168+
TimestampData fromTimestamp,
169+
LocalZonedTimestampData toTimestamp,
170+
String timezone) {
171+
if (fromTimestamp == null || toTimestamp == null) {
172+
return null;
173+
}
174+
return DateTimeUtils.timestampDiff(
175+
timeIntervalUnit,
176+
fromTimestamp.getMillisecond(),
177+
"UTC",
178+
toTimestamp.getEpochMillisecond(),
179+
timezone);
150180
}
151181

152-
public static int timestampDiff(
153-
String symbol, LocalZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
154-
return timestampDiff(
155-
symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getMillisecond());
182+
// Be compatible with the existing definition of Function TIMESTAMP_DIFF
183+
public static Integer timestampDiff(
184+
String timeIntervalUnit,
185+
LocalZonedTimestampData fromTimestamp,
186+
TimestampData toTimestamp,
187+
String timezone) {
188+
if (fromTimestamp == null || toTimestamp == null) {
189+
return null;
190+
}
191+
return DateTimeUtils.timestampDiff(
192+
timeIntervalUnit,
193+
fromTimestamp.getEpochMillisecond(),
194+
timezone,
195+
toTimestamp.getMillisecond(),
196+
"UTC");
156197
}
157198

158-
public static int timestampDiff(
159-
String symbol, ZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
160-
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
199+
public static Integer timestampdiff(
200+
String timeIntervalUnit,
201+
LocalZonedTimestampData fromTimestamp,
202+
LocalZonedTimestampData toTimestamp,
203+
String timezone) {
204+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
161205
}
162206

163-
public static int timestampDiff(
164-
String symbol, LocalZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
165-
return timestampDiff(
166-
symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getMillisecond());
207+
public static Integer timestampdiff(
208+
String timeIntervalUnit,
209+
TimestampData fromTimestamp,
210+
TimestampData toTimestamp,
211+
String timezone) {
212+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
167213
}
168214

169-
public static int timestampDiff(
170-
String symbol, ZonedTimestampData fromTimestamp, LocalZonedTimestampData toTimestamp) {
171-
return timestampDiff(
172-
symbol, fromTimestamp.getMillisecond(), toTimestamp.getEpochMillisecond());
215+
public static Integer timestampdiff(
216+
String timeIntervalUnit,
217+
TimestampData fromTimestamp,
218+
LocalZonedTimestampData toTimestamp,
219+
String timezone) {
220+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
173221
}
174222

175-
public static int timestampDiff(
176-
String symbol, TimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
177-
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
223+
public static Integer timestampdiff(
224+
String timeIntervalUnit,
225+
LocalZonedTimestampData fromTimestamp,
226+
TimestampData toTimestamp,
227+
String timezone) {
228+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
178229
}
179230

180-
public static int timestampDiff(
181-
String symbol, ZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
182-
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
231+
public static LocalZonedTimestampData timestampadd(
232+
String timeIntervalUnit,
233+
Integer interval,
234+
LocalZonedTimestampData timePoint,
235+
String timezone) {
236+
if (interval == null || timePoint == null) {
237+
return null;
238+
}
239+
return LocalZonedTimestampData.fromEpochMillis(
240+
DateTimeUtils.timestampAdd(
241+
timeIntervalUnit, interval, timePoint.getEpochMillisecond(), timezone));
183242
}
184243

185-
public static int timestampDiff(String symbol, long fromDate, long toDate) {
186-
Calendar from = Calendar.getInstance();
187-
from.setTime(new Date(fromDate));
188-
Calendar to = Calendar.getInstance();
189-
to.setTime(new Date(toDate));
190-
Long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
191-
switch (symbol) {
192-
case "SECOND":
193-
return second.intValue();
194-
case "MINUTE":
195-
return second.intValue() / 60;
196-
case "HOUR":
197-
return second.intValue() / 3600;
198-
case "DAY":
199-
return second.intValue() / (24 * 3600);
200-
case "MONTH":
201-
return to.get(Calendar.YEAR) * 12
202-
+ to.get(Calendar.MONDAY)
203-
- (from.get(Calendar.YEAR) * 12 + from.get(Calendar.MONDAY));
204-
case "YEAR":
205-
return to.get(Calendar.YEAR) - from.get(Calendar.YEAR);
206-
default:
207-
LOG.error("Unsupported timestamp diff: {}", symbol);
208-
throw new RuntimeException("Unsupported timestamp diff: " + symbol);
244+
public static TimestampData timestampadd(
245+
String timeIntervalUnit, Integer interval, TimestampData timePoint, String timezone) {
246+
if (interval == null || timePoint == null) {
247+
return null;
209248
}
249+
return TimestampData.fromMillis(
250+
DateTimeUtils.timestampAdd(
251+
timeIntervalUnit, interval, timePoint.getMillisecond(), "UTC"));
210252
}
211253

212254
public static boolean betweenAsymmetric(String value, String minValue, String maxValue) {

0 commit comments

Comments
 (0)