Skip to content

Commit

Permalink
Added logic to limit DateTime/DateTime32, DateTime64 based on clickho…
Browse files Browse the repository at this point in the history
…use column values
  • Loading branch information
subkanthi committed Nov 23, 2023
2 parents aedf70d + 5883bf4 commit 492f007
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 65 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/docker-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ jobs:
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
if: ${{ env.username != '' }}
- name: Build the Docker image
env:
username: ${{ secrets.DOCKERHUB_USERNAME }}
if: ${{ env.username != '' }}
working-directory: sink-connector
run: docker build . --file docker/Dockerfile-sink-on-debezium-base-image --tag altinity/clickhouse-sink-connector:$(date +%F)
- name: Push docker image to dockerhub
env:
username: ${{ secrets.DOCKERHUB_USERNAME }}
if: ${{ env.username != '' }}
working-directory: sink-connector
run: docker push altinity/clickhouse-sink-connector:$(date +%F)
run: docker push altinity/clickhouse-sink-connector:$(date +%F)
14 changes: 4 additions & 10 deletions .github/workflows/testflow-sink-connector-kafka.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
name: TestFlows Tests - Sink connector(Kafka)
name: Kafka - TestFlows Tests

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:
inputs:
sink_version:
description: "sink connector version"
description: "Kafka version"
required: true
type: string
default: "2023-10-30Update "
default: "latest"

env:
# SINK_CONNECTOR_VERSION: "${{ inputs.sink_version }}"
# SINK_CONNECTOR_VERSION: "2023-08-28"
SINK_CONNECTOR_VERSION: "latest"
SINK_CONNECTOR_VERSION: "${{ inputs.sink_version }}"

jobs:
testflows:
Expand Down
17 changes: 5 additions & 12 deletions .github/workflows/testflow-sink-connector-lightweight.yml
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
name: TestFlows Tests - Sink Connector(Light-weight)
name: Lightweight - TestFlows Tests

on:
push:
branches: [ main, develop]
pull_request:
branches: [ main , develop]
on:
workflow_dispatch:
inputs:
sink_version:
description: "sink connector version"
description: "Lightweight version"
required: true
type: string
default: 2023-11-16
default: latest

env:
# SINK_CONNECTOR_VERSION: "${{ inputs.sink_version }}"
# SINK_CONNECTOR_VERSION: "2023-08-11"
# If inputs.sink_version is defined, then use the sink_version, else use $(date + %F)
SINK_CONNECTOR_VERSION: $(date +%F)
SINK_CONNECTOR_VERSION: "${{ inputs.sink_version }}"

jobs:
testflows_lightweight:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,12 @@ public static boolean convert(Schema.Type type, String schemaName,
// Calendar.getInstance(TimeZone.getTimeZone(serverTimeZone)));
}
else if (value instanceof Long) {
// DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)
boolean isColumnDateTime64 = false;
if(schemaName.equalsIgnoreCase(Timestamp.SCHEMA_NAME) && type == Schema.INT64_SCHEMA.type()){
isColumnDateTime64 = true;
}
ps.setString(index, DebeziumConverter.TimestampConverter.convert(value, isColumnDateTime64, serverTimeZone));
ps.setString(index, DebeziumConverter.TimestampConverter.convert(value, clickHouseDataType, serverTimeZone));
}
} else if (isFieldTime) {
ps.setString(index, DebeziumConverter.MicroTimeConverter.convert(value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,28 +84,30 @@ public static class TimestampConverter {
* @param value
* @return
*/
public static String convert(Object value, boolean isDateTime64, ZoneId serverTimezone) {
DateTimeFormatter destFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
public static String convert(Object value, ClickHouseDataType clickHouseDataType, ZoneId serverTimezone) {
DateTimeFormatter destFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// Input is a long.
Instant i = Instant.ofEpochMilli((long) value);
LocalDateTime dt = LocalDateTime.ofInstant(i,serverTimezone);

// if(result < BinaryStreamUtils.DATETIME64_MIN) {
// return BinaryStreamUtils.DATETIME64_MIN;
// } else if(result > BinaryStreamUtils.DATETIME64_MAX) {
// return BinaryStreamUtils.DATETIME64_MAX;
// }
Instant modifiedDT = checkIfDateTimeExceedsSupportedRange(i, isDateTime64);
Instant modifiedDT = checkIfDateTimeExceedsSupportedRange(i, clickHouseDataType);
return modifiedDT.atZone(serverTimezone).format(destFormatter).toString();
}
}

public static Instant checkIfDateTimeExceedsSupportedRange(Instant providedDateTime, boolean isDateTime64) {
public static Instant checkIfDateTimeExceedsSupportedRange(Instant providedDateTime, ClickHouseDataType clickHouseDataType) {

if(providedDateTime.isBefore(DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATETIME)) {
return DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATETIME;
} else if (providedDateTime.isAfter(DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATETIME)){
return DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATETIME;
if(clickHouseDataType == ClickHouseDataType.DateTime ||
clickHouseDataType == ClickHouseDataType.DateTime32) {
if(providedDateTime.getEpochSecond() < DataTypeRange.DATETIME32_MIN) {
return Instant.ofEpochSecond(DataTypeRange.DATETIME32_MIN);
} else if(providedDateTime.getEpochSecond() > DataTypeRange.DATETIME32_MAX) {
return Instant.ofEpochSecond(DataTypeRange.DATETIME32_MAX);
}
} else if(clickHouseDataType == ClickHouseDataType.DateTime64) {
if (providedDateTime.isBefore(DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATETIME64)) {
return DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATETIME64;
} else if (providedDateTime.isAfter(DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATETIME64)) {
return DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATETIME64;
}
}

return providedDateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ public class DataTypeRange


// DateTime
public static final Instant CLICKHOUSE_MIN_SUPPORTED_DATETIME = from(ofEpochMilli
public static final Instant CLICKHOUSE_MIN_SUPPORTED_DATETIME64 = from(ofEpochMilli
(DATETIME64_MIN * 1000).atZone(ZoneId.of("UTC"))).plusNanos(DATETIME64_MIN * 1000 % 1_000);
public static final Instant CLICKHOUSE_MAX_SUPPORTED_DATETIME = from(ofEpochMilli
public static final Instant CLICKHOUSE_MAX_SUPPORTED_DATETIME64 = from(ofEpochMilli
(DATETIME64_MAX * 1000).atZone(ZoneId.of("UTC")).withHour(23).withMinute(59).withSecond(59).withNano(999999999));


// DateTime and DateTime32
public static final long DATETIME32_MIN = 0L;
public static final long DATETIME32_MAX = BinaryStreamUtils.DATETIME_MAX;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,11 @@ public void testMicroTimeConverter() {
Assert.assertTrue(formattedTime.equalsIgnoreCase("01:02:03.000000"));
}

@Test
public void testMicroTimestampConverter() {


// With microseconds
// String resultWMicroSeconds = DebeziumConverter.MicroTimestampConverter.convert(1665076675000000L);
// // Assert.assertTrue(resultWMicroSeconds == 1665076675000L);
//
// // With milliseconds
// String resultWMilliSeconds = DebeziumConverter.MicroTimestampConverter.convert(1665076675000L);
//Assert.assertTrue(resultWMilliSeconds == 1665076675L);

//
// Timestamp result = DebeziumConverter.MicroTimestampConverter.convert(1664416228000000L, ZoneId.of("UTC"));
// System.out.println("");
//
// Timestamp result2 = DebeziumConverter.MicroTimestampConverter.convert(253402300799999990L, ZoneId.of("UTC"));
// System.out.println("");
}

@Test
public void testTimestampConverter() {

Object timestampEpoch = 1640995260000L;
String formattedTimestamp = String.valueOf(DebeziumConverter.TimestampConverter.convert(timestampEpoch, false, ZoneId.of("UTC")));
String formattedTimestamp = String.valueOf(DebeziumConverter.TimestampConverter.convert(timestampEpoch, ClickHouseDataType.DateTime64, ZoneId.of("UTC")));

Assert.assertTrue(formattedTimestamp.equalsIgnoreCase("1640995260000"));
}
Expand All @@ -64,7 +44,7 @@ public void testTimestampConverter() {
public void testTimestampConverterMinRange() {

Object timestampEpoch = -2166681362000L;
String formattedTimestamp = String.valueOf(DebeziumConverter.TimestampConverter.convert(timestampEpoch, false, ZoneId.of("UTC")));
String formattedTimestamp = String.valueOf(DebeziumConverter.TimestampConverter.convert(timestampEpoch, ClickHouseDataType.DateTime64, ZoneId.of("UTC")));

Assert.assertTrue(formattedTimestamp.equalsIgnoreCase("-1420070400000"));
}
Expand All @@ -73,7 +53,7 @@ public void testTimestampConverterMinRange() {
public void testTimestampConverterMaxRange() {

Object timestampEpoch = 4807440238000L;
String formattedTimestamp = String.valueOf(DebeziumConverter.TimestampConverter.convert(timestampEpoch, false, ZoneId.of("UTC")));
String formattedTimestamp = String.valueOf(DebeziumConverter.TimestampConverter.convert(timestampEpoch, ClickHouseDataType.DateTime64, ZoneId.of("UTC")));

Assert.assertTrue(formattedTimestamp.equalsIgnoreCase("4807440238000"));
}
Expand Down Expand Up @@ -130,7 +110,7 @@ public void testZonedTimestampConverter() {

@Test
public void testCheckIfDateTimeExceedsSupportedRange() {
DebeziumConverter.TimestampConverter.convert(1665076675000L, false, ZoneId.of("UTC"));
DebeziumConverter.TimestampConverter.convert(1665076675000L, ClickHouseDataType.DateTime64, ZoneId.of("UTC"));
}


Expand Down

0 comments on commit 492f007

Please sign in to comment.