Skip to content

Commit dd0c353

Browse files
committed
Optimze code and add test
1 parent adab68e commit dd0c353

File tree

2 files changed

+78
-15
lines changed

2 files changed

+78
-15
lines changed

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,13 @@ public static int timestampdiff(
207207
return timestampDiff(symbol, fromTimestamp, toTimestamp);
208208
}
209209

210-
public static int timestampDiff(String symbol, long fromDate, long toDate) {
210+
public static int timestampDiff(String timeIntervalUnit, long fromDate, long toDate) {
211211
Calendar from = Calendar.getInstance();
212212
from.setTime(new Date(fromDate));
213213
Calendar to = Calendar.getInstance();
214214
to.setTime(new Date(toDate));
215215
Long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
216-
switch (symbol) {
216+
switch (timeIntervalUnit) {
217217
case "SECOND":
218218
return second.intValue();
219219
case "MINUTE":
@@ -229,32 +229,34 @@ public static int timestampDiff(String symbol, long fromDate, long toDate) {
229229
case "YEAR":
230230
return to.get(Calendar.YEAR) - from.get(Calendar.YEAR);
231231
default:
232-
LOG.error("Unsupported timestamp diff: {}", symbol);
233-
throw new RuntimeException("Unsupported timestamp diff: " + symbol);
232+
throw new RuntimeException(
233+
String.format(
234+
"Unsupported timestamp interval unit %s. Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR",
235+
timeIntervalUnit));
234236
}
235237
}
236238

237239
public static TimestampData timestampadd(
238-
String timeintervalunit, int interval, LocalZonedTimestampData timepoint) {
239-
return timestampadd(timeintervalunit, interval, timepoint.getEpochMillisecond());
240+
String timeIntervalUnit, int interval, LocalZonedTimestampData timePoint) {
241+
return timestampadd(timeIntervalUnit, interval, timePoint.getEpochMillisecond());
240242
}
241243

242244
public static TimestampData timestampadd(
243-
String timeintervalunit, int interval, ZonedTimestampData timepoint) {
244-
return timestampadd(timeintervalunit, interval, timepoint.getMillisecond());
245+
String timeIntervalUnit, int interval, ZonedTimestampData timePoint) {
246+
return timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond());
245247
}
246248

247249
public static TimestampData timestampadd(
248-
String timeintervalunit, int interval, TimestampData timepoint) {
249-
return timestampadd(timeintervalunit, interval, timepoint.getMillisecond());
250+
String timeIntervalUnit, int interval, TimestampData timePoint) {
251+
return timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond());
250252
}
251253

252254
public static TimestampData timestampadd(
253-
String timeintervalunit, int interval, long timepoint) {
255+
String timeIntervalUnit, int interval, long timePoint) {
254256
Calendar calendar = Calendar.getInstance();
255-
calendar.setTime(new Date(timepoint));
257+
calendar.setTime(new Date(timePoint));
256258
int field;
257-
switch (timeintervalunit) {
259+
switch (timeIntervalUnit) {
258260
case "SECOND":
259261
field = Calendar.SECOND;
260262
break;
@@ -274,8 +276,10 @@ public static TimestampData timestampadd(
274276
field = Calendar.YEAR;
275277
break;
276278
default:
277-
LOG.error("Unsupported timestamp add: {}", timeintervalunit);
278-
throw new RuntimeException("Unsupported timestamp add: " + timeintervalunit);
279+
throw new RuntimeException(
280+
String.format(
281+
"Unsupported timestamp interval unit %s. Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR",
282+
timeIntervalUnit));
279283
}
280284
calendar.add(field, interval);
281285
return TimestampData.fromMillis(calendar.getTimeInMillis());

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,9 +251,32 @@ public void testTranslateFilterToJaninoExpression() {
251251
testFilterExpression(
252252
"TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\", __time_zone__)");
253253
testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt, __time_zone__)");
254+
testFilterExpression(
255+
"TIMESTAMP_DIFF('SECOND', dt1, dt2)", "timestampDiff(\"SECOND\", dt1, dt2)");
256+
testFilterExpression(
257+
"TIMESTAMP_DIFF('MINUTE', dt1, dt2)", "timestampDiff(\"MINUTE\", dt1, dt2)");
258+
testFilterExpression(
259+
"TIMESTAMP_DIFF('HOUR', dt1, dt2)", "timestampDiff(\"HOUR\", dt1, dt2)");
254260
testFilterExpression("TIMESTAMP_DIFF('DAY', dt1, dt2)", "timestampDiff(\"DAY\", dt1, dt2)");
261+
testFilterExpression(
262+
"TIMESTAMP_DIFF('MONTH', dt1, dt2)", "timestampDiff(\"MONTH\", dt1, dt2)");
263+
testFilterExpression(
264+
"TIMESTAMP_DIFF('YEAR', dt1, dt2)", "timestampDiff(\"YEAR\", dt1, dt2)");
265+
testFilterExpression(
266+
"TIMESTAMPDIFF(SECOND, dt1, dt2)", "timestampdiff(\"SECOND\", dt1, dt2)");
267+
testFilterExpression(
268+
"TIMESTAMPDIFF(MINUTE, dt1, dt2)", "timestampdiff(\"MINUTE\", dt1, dt2)");
269+
testFilterExpression("TIMESTAMPDIFF(HOUR, dt1, dt2)", "timestampdiff(\"HOUR\", dt1, dt2)");
255270
testFilterExpression("TIMESTAMPDIFF(DAY, dt1, dt2)", "timestampdiff(\"DAY\", dt1, dt2)");
271+
testFilterExpression(
272+
"TIMESTAMPDIFF(MONTH, dt1, dt2)", "timestampdiff(\"MONTH\", dt1, dt2)");
273+
testFilterExpression("TIMESTAMPDIFF(YEAR, dt1, dt2)", "timestampdiff(\"YEAR\", dt1, dt2)");
274+
testFilterExpression("TIMESTAMPADD(SECOND, 1, dt)", "timestampadd(\"SECOND\", 1, dt)");
275+
testFilterExpression("TIMESTAMPADD(MINUTE, 1, dt)", "timestampadd(\"MINUTE\", 1, dt)");
276+
testFilterExpression("TIMESTAMPADD(HOUR, 1, dt)", "timestampadd(\"HOUR\", 1, dt)");
256277
testFilterExpression("TIMESTAMPADD(DAY, 1, dt)", "timestampadd(\"DAY\", 1, dt)");
278+
testFilterExpression("TIMESTAMPADD(MONTH, 1, dt)", "timestampadd(\"MONTH\", 1, dt)");
279+
testFilterExpression("TIMESTAMPADD(YEAR, 1, dt)", "timestampadd(\"YEAR\", 1, dt)");
257280
testFilterExpression("IF(a>b,a,b)", "a > b ? a : b");
258281
testFilterExpression("NULLIF(a,b)", "nullif(a, b)");
259282
testFilterExpression("COALESCE(a,b,c)", "coalesce(a, b, c)");
@@ -313,6 +336,42 @@ public void testTranslateFilterToJaninoExpression() {
313336
testFilterExpression("cast(dt as TIMESTAMP)", "castToTimestamp(dt, __time_zone__)");
314337
}
315338

339+
@Test
340+
public void testTranslateFilterToJaninoExpressionError() {
341+
Assertions.assertThatThrownBy(
342+
() -> {
343+
TransformParser.translateFilterExpressionToJaninoExpression(
344+
"TIMESTAMPDIFF(SECONDS, dt1, dt2)", Collections.emptyList());
345+
})
346+
.isExactlyInstanceOf(ParseException.class)
347+
.hasRootCauseInstanceOf(org.apache.calcite.sql.parser.impl.ParseException.class)
348+
.hasRootCauseMessage(
349+
"Encountered \"SECONDS\" at line 1, column 38.\n"
350+
+ "Was expecting one of:\n"
351+
+ " \"DAY\" ...\n"
352+
+ " \"FRAC_SECOND\" ...\n"
353+
+ " \"HOUR\" ...\n"
354+
+ " \"MICROSECOND\" ...\n"
355+
+ " \"MINUTE\" ...\n"
356+
+ " \"MONTH\" ...\n"
357+
+ " \"NANOSECOND\" ...\n"
358+
+ " \"QUARTER\" ...\n"
359+
+ " \"SECOND\" ...\n"
360+
+ " \"SQL_TSI_DAY\" ...\n"
361+
+ " \"SQL_TSI_FRAC_SECOND\" ...\n"
362+
+ " \"SQL_TSI_HOUR\" ...\n"
363+
+ " \"SQL_TSI_MICROSECOND\" ...\n"
364+
+ " \"SQL_TSI_MINUTE\" ...\n"
365+
+ " \"SQL_TSI_MONTH\" ...\n"
366+
+ " \"SQL_TSI_QUARTER\" ...\n"
367+
+ " \"SQL_TSI_SECOND\" ...\n"
368+
+ " \"SQL_TSI_WEEK\" ...\n"
369+
+ " \"SQL_TSI_YEAR\" ...\n"
370+
+ " \"WEEK\" ...\n"
371+
+ " \"YEAR\" ...\n"
372+
+ " ");
373+
}
374+
316375
@Test
317376
public void testGenerateProjectionColumns() {
318377
List<Column> testColumns =

0 commit comments

Comments
 (0)