Skip to content

Commit fb9a360

Browse files
committed
[FLINK-36865][cdc] Provide UNIX_TIMESTAMP series functions in YAML pipeline
1 parent fc71888 commit fb9a360

File tree

9 files changed

+470
-6
lines changed

9 files changed

+470
-6
lines changed

docs/content.zh/docs/core-concept/transform.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
160160
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
161161
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
162162
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
163+
| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
164+
| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
165+
| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
163166
164167
## Conditional Functions
165168

docs/content/docs/core-concept/transform.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
160160
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
161161
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
162162
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
163+
| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
164+
| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
165+
| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
163166
164167
## Conditional Functions
165168

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ public class DateTimeUtils {
4343
*/
4444
public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000
4545

46+
/** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
47+
private static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
48+
49+
/** The SimpleDateFormat string for ISO times, "HH:mm:ss". */
50+
private static final String TIME_FORMAT_STRING = "HH:mm:ss";
51+
52+
/** The SimpleDateFormat string for ISO timestamps, "yyyy-MM-dd HH:mm:ss". */
53+
private static final String TIMESTAMP_FORMAT_STRING =
54+
DATE_FORMAT_STRING + " " + TIME_FORMAT_STRING;
55+
4656
/**
4757
* A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat is not thread-safe.
4858
* (string_format) => formatter
@@ -109,7 +119,7 @@ private static long internalParseTimestampMillis(String dateStr, String format,
109119
} catch (ParseException e) {
110120
LOG.error(
111121
String.format(
112-
"Exception when parsing datetime string '%s' in format '%s'",
122+
"Exception when parsing datetime string '%s' in format '%s', the default value Long.MIN_VALUE(-9223372036854775808) will be returned.",
113123
dateStr, format),
114124
e);
115125
return Long.MIN_VALUE;
@@ -128,6 +138,61 @@ private static int ymdToJulian(int year, int month, int day) {
128138
return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 32045;
129139
}
130140

141+
// --------------------------------------------------------------------------------------------
142+
// UNIX TIME
143+
// --------------------------------------------------------------------------------------------
144+
145+
/**
146+
* Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
147+
* "yyyy-MM-dd HH:mm:ss" format.
148+
*/
149+
public static String formatUnixTimestamp(long unixtime, TimeZone tz) {
150+
return formatUnixTimestamp(unixtime, TIMESTAMP_FORMAT_STRING, tz);
151+
}
152+
153+
/**
154+
* Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
155+
* given format.
156+
*/
157+
public static String formatUnixTimestamp(long unixtime, String format, TimeZone tz) {
158+
SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
159+
formatter.setTimeZone(tz);
160+
Date date = new Date(unixtime * 1000);
161+
try {
162+
return formatter.format(date);
163+
} catch (Exception e) {
164+
LOG.error("Exception when formatting.", e);
165+
return null;
166+
}
167+
}
168+
169+
/** Returns the value of the timestamp to seconds since '1970-01-01 00:00:00' UTC. */
170+
public static long unixTimestamp(long ts) {
171+
return ts / 1000;
172+
}
173+
174+
/**
175+
* Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
176+
* 00:00:00' UTC.
177+
*/
178+
public static long unixTimestamp(String dateStr, TimeZone tz) {
179+
return unixTimestamp(dateStr, TIMESTAMP_FORMAT_STRING, tz);
180+
}
181+
182+
/**
183+
* Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
184+
* 00:00:00' UTC.
185+
*/
186+
public static long unixTimestamp(String dateStr, String format, TimeZone tz) {
187+
long ts = internalParseTimestampMillis(dateStr, format, tz);
188+
if (ts == Long.MIN_VALUE) {
189+
return Long.MIN_VALUE;
190+
} else {
191+
// return the seconds
192+
return ts / 1000;
193+
}
194+
}
195+
131196
// --------------------------------------------------------------------------------------------
132197
// Format
133198
// --------------------------------------------------------------------------------------------
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.runtime.functions;
19+
20+
import org.apache.flink.annotation.Internal;
21+
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
22+
23+
import org.apache.calcite.sql.SqlFunction;
24+
import org.apache.calcite.sql.SqlFunctionCategory;
25+
import org.apache.calcite.sql.SqlKind;
26+
import org.apache.calcite.sql.SqlOperatorBinding;
27+
import org.apache.calcite.sql.type.SqlOperandTypeChecker;
28+
import org.apache.calcite.sql.type.SqlOperandTypeInference;
29+
import org.apache.calcite.sql.type.SqlReturnTypeInference;
30+
import org.apache.calcite.sql.validate.SqlMonotonicity;
31+
32+
import javax.annotation.Nullable;
33+
34+
import java.util.Optional;
35+
import java.util.function.Function;
36+
37+
import static org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION;
38+
import static org.apache.flink.table.functions.BuiltInFunctionDefinition.qualifyFunctionName;
39+
import static org.apache.flink.table.functions.BuiltInFunctionDefinition.validateFunction;
40+
import static org.apache.flink.util.Preconditions.checkNotNull;
41+
42+
/**
43+
* SQL version of {@link BuiltInFunctionDefinition}. This is the case when the operator has a
44+
* special parsing syntax or uses other Calcite-specific features that are not exposed via {@link
45+
* BuiltInFunctionDefinition} yet.
46+
*
47+
* <p>Note: Try to keep usages of this class to a minimum and use Flink's {@link
48+
* BuiltInFunctionDefinition} stack instead.
49+
*
50+
* <p>For simple functions, use the provided builder. Otherwise, this class can also be extended.
51+
*/
52+
@Internal
53+
public class BuiltInSqlFunction extends SqlFunction {
54+
55+
private final @Nullable Integer version;
56+
57+
private final boolean isDeterministic;
58+
59+
private final boolean isInternal;
60+
61+
private final Function<SqlOperatorBinding, SqlMonotonicity> monotonicity;
62+
63+
protected BuiltInSqlFunction(
64+
String name,
65+
int version,
66+
SqlKind kind,
67+
@Nullable SqlReturnTypeInference returnTypeInference,
68+
@Nullable SqlOperandTypeInference operandTypeInference,
69+
@Nullable SqlOperandTypeChecker operandTypeChecker,
70+
SqlFunctionCategory category,
71+
boolean isDeterministic,
72+
boolean isInternal,
73+
Function<SqlOperatorBinding, SqlMonotonicity> monotonicity) {
74+
super(
75+
checkNotNull(name),
76+
checkNotNull(kind),
77+
returnTypeInference,
78+
operandTypeInference,
79+
operandTypeChecker,
80+
checkNotNull(category));
81+
this.version = isInternal ? null : version;
82+
this.isDeterministic = isDeterministic;
83+
this.isInternal = isInternal;
84+
this.monotonicity = monotonicity;
85+
validateFunction(name, version, isInternal);
86+
}
87+
88+
protected BuiltInSqlFunction(
89+
String name,
90+
SqlKind kind,
91+
SqlReturnTypeInference returnTypeInference,
92+
SqlOperandTypeInference operandTypeInference,
93+
@Nullable SqlOperandTypeChecker operandTypeChecker,
94+
SqlFunctionCategory category) {
95+
this(
96+
name,
97+
DEFAULT_VERSION,
98+
kind,
99+
returnTypeInference,
100+
operandTypeInference,
101+
operandTypeChecker,
102+
category,
103+
true,
104+
false,
105+
call -> SqlMonotonicity.NOT_MONOTONIC);
106+
}
107+
108+
/** Builder for configuring and creating instances of {@link BuiltInSqlFunction}. */
109+
public static Builder newBuilder() {
110+
return new Builder();
111+
}
112+
113+
public final Optional<Integer> getVersion() {
114+
return Optional.ofNullable(version);
115+
}
116+
117+
public String getQualifiedName() {
118+
if (isInternal) {
119+
return getName();
120+
}
121+
assert version != null;
122+
return qualifyFunctionName(getName(), version);
123+
}
124+
125+
@Override
126+
public boolean isDeterministic() {
127+
return isDeterministic;
128+
}
129+
130+
public final boolean isInternal() {
131+
return isInternal;
132+
}
133+
134+
@Override
135+
public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) {
136+
return monotonicity.apply(call);
137+
}
138+
139+
// --------------------------------------------------------------------------------------------
140+
// Builder
141+
// --------------------------------------------------------------------------------------------
142+
143+
/** Builder for fluent definition of built-in functions. */
144+
public static class Builder {
145+
146+
private String name;
147+
148+
private int version = DEFAULT_VERSION;
149+
150+
private SqlKind kind = SqlKind.OTHER_FUNCTION;
151+
152+
private SqlReturnTypeInference returnTypeInference;
153+
154+
private SqlOperandTypeInference operandTypeInference;
155+
156+
private SqlOperandTypeChecker operandTypeChecker;
157+
158+
private SqlFunctionCategory category = SqlFunctionCategory.SYSTEM;
159+
160+
private boolean isInternal = false;
161+
162+
private boolean isDeterministic = true;
163+
164+
private Function<SqlOperatorBinding, SqlMonotonicity> monotonicity =
165+
call -> SqlMonotonicity.NOT_MONOTONIC;
166+
167+
/** @see BuiltInFunctionDefinition.Builder#name(String) */
168+
public Builder name(String name) {
169+
this.name = name;
170+
return this;
171+
}
172+
173+
/** @see BuiltInFunctionDefinition.Builder#version(int) */
174+
public Builder version(int version) {
175+
this.version = version;
176+
return this;
177+
}
178+
179+
public Builder kind(SqlKind kind) {
180+
this.kind = kind;
181+
return this;
182+
}
183+
184+
public Builder returnType(SqlReturnTypeInference returnTypeInference) {
185+
this.returnTypeInference = returnTypeInference;
186+
return this;
187+
}
188+
189+
public Builder operandTypeInference(SqlOperandTypeInference operandTypeInference) {
190+
this.operandTypeInference = operandTypeInference;
191+
return this;
192+
}
193+
194+
public Builder operandTypeChecker(SqlOperandTypeChecker operandTypeChecker) {
195+
this.operandTypeChecker = operandTypeChecker;
196+
return this;
197+
}
198+
199+
public Builder category(SqlFunctionCategory category) {
200+
this.category = category;
201+
return this;
202+
}
203+
204+
public Builder notDeterministic() {
205+
this.isDeterministic = false;
206+
return this;
207+
}
208+
209+
/** @see BuiltInFunctionDefinition.Builder#internal() */
210+
public Builder internal() {
211+
this.isInternal = true;
212+
return this;
213+
}
214+
215+
public Builder monotonicity(SqlMonotonicity staticMonotonicity) {
216+
this.monotonicity = call -> staticMonotonicity;
217+
return this;
218+
}
219+
220+
public Builder monotonicity(Function<SqlOperatorBinding, SqlMonotonicity> monotonicity) {
221+
this.monotonicity = monotonicity;
222+
return this;
223+
}
224+
225+
public BuiltInSqlFunction build() {
226+
return new BuiltInSqlFunction(
227+
name,
228+
version,
229+
kind,
230+
returnTypeInference,
231+
operandTypeInference,
232+
operandTypeChecker,
233+
category,
234+
isDeterministic,
235+
isInternal,
236+
monotonicity);
237+
}
238+
}
239+
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,27 @@ public static int currentDate(long epochTime, String timezone) {
8080
return timestampMillisToDate(localtimestamp(epochTime, timezone).getMillisecond());
8181
}
8282

83+
public static String fromUnixtime(long seconds, String timezone) {
84+
return DateTimeUtils.formatUnixTimestamp(seconds, TimeZone.getTimeZone(timezone));
85+
}
86+
87+
public static String fromUnixtime(long seconds, String format, String timezone) {
88+
return DateTimeUtils.formatUnixTimestamp(seconds, format, TimeZone.getTimeZone(timezone));
89+
}
90+
91+
public static long unixTimestamp(long epochTime, String timezone) {
92+
return DateTimeUtils.unixTimestamp(epochTime);
93+
}
94+
95+
public static long unixTimestamp(String dateTimeStr, long epochTime, String timezone) {
96+
return DateTimeUtils.unixTimestamp(dateTimeStr, TimeZone.getTimeZone(timezone));
97+
}
98+
99+
public static long unixTimestamp(
100+
String dateTimeStr, String format, long epochTime, String timezone) {
101+
return DateTimeUtils.unixTimestamp(dateTimeStr, format, TimeZone.getTimeZone(timezone));
102+
}
103+
83104
public static String dateFormat(TimestampData timestamp, String format) {
84105
return DateTimeUtils.formatTimestampMillis(
85106
timestamp.getMillisecond(), format, TimeZone.getTimeZone("UTC"));

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,18 @@ public class JaninoCompiler {
5757
Arrays.asList("CURRENT_TIMESTAMP", "NOW");
5858

5959
private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS =
60-
Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME", "CURRENT_DATE");
60+
Arrays.asList(
61+
"LOCALTIME",
62+
"LOCALTIMESTAMP",
63+
"CURRENT_TIME",
64+
"CURRENT_DATE",
65+
"UNIX_TIMESTAMP");
6166

6267
private static final List<String> TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
6368
Arrays.asList("DATE_FORMAT");
6469

6570
private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
66-
Arrays.asList("TO_DATE", "TO_TIMESTAMP");
71+
Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME");
6772

6873
public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
6974
public static final String DEFAULT_TIME_ZONE = "__time_zone__";

0 commit comments

Comments
 (0)