Skip to content

Commit 75e69cb

Browse files
committed
add support for OffsetDateTime & ZonedDateTime handling
1 parent 406210b commit 75e69cb

File tree

42 files changed

+4931
-46
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+4931
-46
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
*.iml
33
target/
44
dependency-reduced-pom.xml
5+
docker/

README.md

Lines changed: 337 additions & 41 deletions
Large diffs are not rendered by default.

pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222

2323
<groupId>com.github.hpgrahsl.ksqldb.functions</groupId>
2424
<artifactId>datetime-functions</artifactId>
25-
<version>0.1.0</version>
25+
<version>0.2.0</version>
2626
<packaging>jar</packaging>
2727

2828
<name>Utility functions for date time operations within ksqlDB</name>
2929

3030
<properties>
3131
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
32-
<ksql.version>5.4.2</ksql.version>
32+
<ksqldb.version>5.5.1</ksqldb.version>
3333
<kafka.connect.version>5.5.1-ce</kafka.connect.version>
3434
<junit.jupiter.version>5.6.1</junit.jupiter.version>
3535
<javax.json.version>1.1.4</javax.json.version>
@@ -98,8 +98,8 @@
9898
<!-- KSQL dependencies -->
9999
<dependency>
100100
<groupId>io.confluent.ksql</groupId>
101-
<artifactId>ksql-udf</artifactId>
102-
<version>${ksql.version}</version>
101+
<artifactId>ksqldb-udf</artifactId>
102+
<version>${ksqldb.version}</version>
103103
</dependency>
104104
<dependency>
105105
<groupId>org.apache.kafka</groupId>

src/main/java/com/github/hpgrahsl/ksqldb/functions/UdfLocalTimeChronology.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.confluent.ksql.function.udf.Udf;
2323
import io.confluent.ksql.function.udf.UdfDescription;
2424
import io.confluent.ksql.function.udf.UdfParameter;
25-
import java.util.Objects;
2625
import org.apache.kafka.connect.data.Struct;
2726
import org.slf4j.Logger;
2827
import org.slf4j.LoggerFactory;
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright (c) 2020. Hans-Peter Grahsl (grahslhp@gmail.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package com.github.hpgrahsl.ksqldb.functions;
19+
20+
import com.github.hpgrahsl.ksqldb.functions.schemas.DateTimeSchemas;
21+
import com.github.hpgrahsl.ksqldb.functions.structs.StructsConverter;
22+
import io.confluent.ksql.function.udf.Udf;
23+
import io.confluent.ksql.function.udf.UdfDescription;
24+
import io.confluent.ksql.function.udf.UdfParameter;
25+
import java.time.OffsetDateTime;
26+
import java.time.format.DateTimeFormatter;
27+
import java.util.Locale;
28+
import org.apache.kafka.connect.data.Struct;
29+
30+
@UdfDescription(
31+
name = "dt_offsetdatetime",
32+
description = "Factory functions for OffsetDateTime struct creation",
33+
author = "Hans-Peter Grahsl (follow @hpgrahsl)",
34+
version = "0.1.0"
35+
)
36+
public class UdfOffsetDateTime {
37+
38+
@Udf(description = "Create an OffsetDateTime struct based on the system clock's current date-time and default time-zone",
39+
schema = DateTimeSchemas.OFFSETDATETIME_SCHEMA_DESCRIPTOR)
40+
public Struct createOffsetDateTime() {
41+
return StructsConverter.toOffsetDateTimeStruct(OffsetDateTime.now());
42+
}
43+
44+
@Udf(description = "Create an OffsetDateTime struct based on the system clock's date-time in the specified time-zone",
45+
schema = DateTimeSchemas.OFFSETDATETIME_SCHEMA_DESCRIPTOR)
46+
public Struct createOffsetDateTime(
47+
@UdfParameter(
48+
value = "zoneId",
49+
description = "the ZoneId struct for the OffsetDateTime",
50+
schema = DateTimeSchemas.ZONEID_SCHEMA_DESCRIPTOR)
51+
final Struct zoneId) {
52+
return zoneId != null ? StructsConverter.toOffsetDateTimeStruct(OffsetDateTime.now(StructsConverter.fromZoneIdStruct(zoneId))) : null;
53+
}
54+
55+
@Udf(description = "Create an OffsetDateTime struct based on given LocalDateTime and ZoneOffset structs",
56+
schema = DateTimeSchemas.OFFSETDATETIME_SCHEMA_DESCRIPTOR)
57+
public Struct createOffsetDateTime(
58+
@UdfParameter(
59+
value = "localDateTime",
60+
description = "the LocalDateTime struct for the OffsetDateTime",
61+
schema = DateTimeSchemas.LOCALDATETIME_SCHEMA_DESCRIPTOR)
62+
final Struct localDateTime,
63+
@UdfParameter(
64+
value = "zoneOffset",
65+
description = "the ZoneOffset struct for the OffsetDateTime",
66+
schema = DateTimeSchemas.ZONEOFFSET_SCHEMA_DESCRIPTOR)
67+
final Struct zoneOffset) {
68+
if (localDateTime == null || zoneOffset == null)
69+
return null;
70+
return StructsConverter.toOffsetDateTimeStruct(OffsetDateTime.of(
71+
StructsConverter.fromLocalDateTimeStruct(localDateTime),
72+
StructsConverter.fromZoneOffsetStruct(zoneOffset)
73+
));
74+
}
75+
76+
@Udf(description = "Create an OffsetDateTime struct based on given LocalDate, LocalTime and ZoneOffset structs",
77+
schema = DateTimeSchemas.OFFSETDATETIME_SCHEMA_DESCRIPTOR)
78+
public Struct createOffsetDateTime(
79+
@UdfParameter(
80+
value = "localDate",
81+
description = "the LocalDate struct for the OffsetDateTime",
82+
schema = DateTimeSchemas.LOCALDATE_SCHEMA_DESCRIPTOR)
83+
final Struct localDate,
84+
@UdfParameter(
85+
value = "localTime",
86+
description = "the LocalTime struct for the OffsetDateTime",
87+
schema = DateTimeSchemas.LOCALTIME_SCHEMA_DESCRIPTOR)
88+
final Struct localTime,
89+
@UdfParameter(
90+
value = "zoneOffset",
91+
description = "the ZoneOffset struct for the OffsetDateTime",
92+
schema = DateTimeSchemas.ZONEOFFSET_SCHEMA_DESCRIPTOR)
93+
final Struct zoneOffset) {
94+
if (localDate == null || localTime == null || zoneOffset == null)
95+
return null;
96+
return StructsConverter.toOffsetDateTimeStruct(
97+
OffsetDateTime.of(
98+
StructsConverter.fromLocalDateStruct(localDate),
99+
StructsConverter.fromLocalTimeStruct(localTime),
100+
StructsConverter.fromZoneOffsetStruct(zoneOffset))
101+
);
102+
}
103+
104+
@Udf(description = "Create an OffsetDateTime struct from its string representation using java.time.format.DateTimeFormatter#ISO_OFFSET_DATE_TIME",
105+
schema = DateTimeSchemas.OFFSETDATETIME_SCHEMA_DESCRIPTOR)
106+
public Struct createOffsetDateTime(
107+
@UdfParameter(
108+
value = "text",
109+
description = "the string representation of the OffsetDateTime")
110+
final String text) {
111+
return text != null ? StructsConverter.toOffsetDateTimeStruct(OffsetDateTime.parse(text)) : null;
112+
}
113+
114+
@Udf(description = "Create an OffsetDateTime struct from its string representation using the specified java.time.format.DateTimeFormatter format string",
115+
schema = DateTimeSchemas.OFFSETDATETIME_SCHEMA_DESCRIPTOR)
116+
public Struct createOffsetDateTime(
117+
@UdfParameter(
118+
value = "text",
119+
description = "the string representation of the OffsetDateTime")
120+
final String text,
121+
@UdfParameter(
122+
value = "format",
123+
description = "the specified java.time.format.DateTimeFormatter format string")
124+
final String format) {
125+
if (text == null || format == null)
126+
return null;
127+
return StructsConverter.toOffsetDateTimeStruct(OffsetDateTime.parse(text, DateTimeFormatter.ofPattern(format,
128+
Locale.ENGLISH)));
129+
}
130+
131+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright (c) 2020. Hans-Peter Grahsl (grahslhp@gmail.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package com.github.hpgrahsl.ksqldb.functions;
19+
20+
import com.github.hpgrahsl.ksqldb.functions.schemas.DateTimeSchemas;
21+
import com.github.hpgrahsl.ksqldb.functions.structs.StructsConverter;
22+
import io.confluent.ksql.function.udf.Udf;
23+
import io.confluent.ksql.function.udf.UdfDescription;
24+
import io.confluent.ksql.function.udf.UdfParameter;
25+
import org.apache.kafka.connect.data.Struct;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
@UdfDescription(
30+
name = "dt_offsetdatetime_chronology",
31+
description = "Chronology check of offset datetimes",
32+
author = "Hans-Peter Grahsl (follow @hpgrahsl)",
33+
version = "0.1.0"
34+
)
35+
public class UdfOffsetDateTimeChronology {
36+
37+
private static final Logger LOGGER = LoggerFactory.getLogger(UdfOffsetDateTimeChronology.class);
38+
39+
@Udf(description = "Check if a offset datetime is either before, after or equal to another offset datetime")
40+
public Boolean check(
41+
@UdfParameter(
42+
value = "baseOffsetDateTime",
43+
description = "the offset datetime to check against",
44+
schema = DateTimeSchemas.OFFSETDATETIME_SCHEMA_DESCRIPTOR)
45+
final Struct baseOffsetDateTime,
46+
@UdfParameter(
47+
value = "offsetDateTime",
48+
description = "the offset datetime to check whether it's before, after or equal",
49+
schema = DateTimeSchemas.OFFSETDATETIME_SCHEMA_DESCRIPTOR)
50+
final Struct offsetDateTime,
51+
@UdfParameter(
52+
value = "chronologyMode",
53+
description = "the chronologyMode being either: 'IS_BEFORE','IS_AFTER','IS_EQUAL'")
54+
final String chronologyMode
55+
) {
56+
if (baseOffsetDateTime == null || offsetDateTime == null || chronologyMode == null)
57+
return null;
58+
try {
59+
ChronologyMode cm = ChronologyMode.valueOf(chronologyMode);
60+
switch (cm) {
61+
case IS_BEFORE:
62+
return StructsConverter.fromOffsetDateTimeStruct(offsetDateTime)
63+
.isBefore(StructsConverter.fromOffsetDateTimeStruct(baseOffsetDateTime));
64+
case IS_AFTER:
65+
return StructsConverter.fromOffsetDateTimeStruct(offsetDateTime)
66+
.isAfter(StructsConverter.fromOffsetDateTimeStruct(baseOffsetDateTime));
67+
case IS_EQUAL:
68+
return StructsConverter.fromOffsetDateTimeStruct(offsetDateTime)
69+
.isEqual(StructsConverter.fromOffsetDateTimeStruct(baseOffsetDateTime));
70+
}
71+
} catch(IllegalArgumentException e) {
72+
LOGGER.error("chronologyMode '" + chronologyMode +
73+
"' is invalid - must be one of: 'IS_BEFORE','IS_AFTER','IS_EQUAL'",e);
74+
}
75+
return null;
76+
}
77+
78+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) 2020. Hans-Peter Grahsl (grahslhp@gmail.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package com.github.hpgrahsl.ksqldb.functions;
19+
20+
import com.github.hpgrahsl.ksqldb.functions.schemas.DateTimeSchemas;
21+
import com.github.hpgrahsl.ksqldb.functions.structs.StructsConverter;
22+
import io.confluent.ksql.function.udf.Udf;
23+
import io.confluent.ksql.function.udf.UdfDescription;
24+
import io.confluent.ksql.function.udf.UdfParameter;
25+
import java.time.format.DateTimeFormatter;
26+
import java.util.Locale;
27+
import org.apache.kafka.connect.data.Struct;
28+
29+
@UdfDescription(
30+
name = "dt_offsetdatetime_format",
31+
description = "Create a string representation of the OffsetDateTime struct",
32+
author = "Hans-Peter Grahsl (follow @hpgrahsl)",
33+
version = "0.1.0"
34+
)
35+
public class UdfOffsetDateTimeFormat {
36+
37+
@Udf(description = "Create a string representation of the OffsetDateTime struct using the java.time.format.DateTimeFormatter#ISO_OFFSET_DATE_TIME format")
38+
public String format(
39+
@UdfParameter(
40+
value = "offsetDateTime",
41+
description = "the OffsetDateTime struct to create a string representation for",
42+
schema = DateTimeSchemas.OFFSETDATETIME_SCHEMA_DESCRIPTOR)
43+
final Struct offsetDateTime
44+
) {
45+
return offsetDateTime != null ? StructsConverter.fromOffsetDateTimeStruct(offsetDateTime).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME) : null;
46+
}
47+
48+
@Udf(description = "Create a string representation of the OffsetDateTime struct using the specified java.time.format.DateTimeFormatter format string")
49+
public String format(
50+
@UdfParameter(
51+
value = "offsetDateTime",
52+
description = "the OffsetDateTime struct to create a string representation for",
53+
schema = DateTimeSchemas.OFFSETDATETIME_SCHEMA_DESCRIPTOR)
54+
final Struct offsetDateTime,
55+
@UdfParameter(
56+
value = "format",
57+
description = "the java.time.format.DateTimeFormatter format string")
58+
final String format
59+
) {
60+
if (offsetDateTime == null || format == null )
61+
return null;
62+
return StructsConverter.fromOffsetDateTimeStruct(offsetDateTime).format(DateTimeFormatter.ofPattern(format,
63+
Locale.ENGLISH));
64+
}
65+
66+
}

0 commit comments

Comments
 (0)