Skip to content

Commit

Permalink
Added logic to override DateTime/DateTime64 insert syntax to String
Browse files Browse the repository at this point in the history
  • Loading branch information
subkanthi committed Nov 22, 2023
1 parent 1d5dbb5 commit 44ab85d
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -31,7 +34,7 @@ public class DateTimeWithTimeZoneSchemaOnlyIT {
@Container
public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
.withInitScript("init_clickhouse_schema_only.sql")
.withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml")
.withUsername("ch_user")
.withPassword("password")
Expand Down Expand Up @@ -74,14 +77,21 @@ public void testCreateTable() throws Exception {
});

Thread.sleep(30000);
Connection conn = connectToMySQL();
// alter table ship_class change column class_name class_name_new int;
// alter table ship_class change column tonange tonange_new decimal(10,10);

conn.prepareStatement("INSERT INTO `temporal_types_DATETIME` VALUES ('DATETIME-INSERT','1000-01-01 00:00:00','2022-09-29 01:47:46','9999-12-31 23:59:59','9999-12-31 23:59:59');\n").execute();
conn.prepareStatement("INSERT INTO `temporal_types_DATETIME1` VALUES ('DATETIME(1)-INSERT','1000-01-01 00:00:00.0','2022-09-29 01:48:25.1','9999-12-31 23:59:59.9','9999-12-31 23:59:59');").execute();
conn.prepareStatement("INSERT INTO `temporal_types_DATETIME2` VALUES ('DATETIME(2)-INSERT','1000-01-01 00:00:00.00','2022-09-29 01:49:05.12','9999-12-31 23:59:59.99','9999-12-31 23:59:59');\n").execute();
//conn.prepareStatement("INSERT INTO `temporal_types_DATETIME` VALUES ('DATETIME-INSERT','1000-01-01 00:00:00','2022-09-29 01:47:46','9999-12-31 23:59:59',NULL);\n").execute();

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);


writer.getConnection().close();
//Thread.sleep(10000);
Thread.sleep(10000);

writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
Expand Down Expand Up @@ -211,6 +221,7 @@ protected Properties getDebeziumProperties() throws Exception {
defaultProps.setProperty("snapshot.mode", "schema_only");
defaultProps.setProperty("disable.drop.truncate", "true");
defaultProps.setProperty("auto.create.tables", "false");
defaultProps.setProperty("enable.snapshot.ddl", "false");

defaultProps.setProperty("database.hostname", mySqlContainer.getHost());
defaultProps.setProperty("database.port", String.valueOf(mySqlContainer.getFirstMappedPort()));
Expand Down Expand Up @@ -239,4 +250,22 @@ protected Properties getDebeziumProperties() throws Exception {
return defaultProps;

}

Connection connectToMySQL() {
Connection conn = null;
try {

String connectionUrl = String.format("jdbc:mysql://%s:%s/%s?user=%s&password=%s", mySqlContainer.getHost(), mySqlContainer.getFirstMappedPort(),
mySqlContainer.getDatabaseName(), mySqlContainer.getUsername(), mySqlContainer.getPassword());
conn = DriverManager.getConnection(connectionUrl);


} catch (SQLException ex) {
// handle any errors

}

return conn;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
--CREATE USER 'ch_user' IDENTIFIED WITH plaintext_password BY 'root';
--SET allow_introspection_functions=1;
--GRANT ALL ON . TO 'ch_user' WITH GRANT OPTION
--
--
-- CREATE USER ch_user IDENTIFIED WITH plaintext_password BY 'password';
CREATE database datatypes;
CREATE database employees;
CREATE database public;
CREATE database project;

CREATE TABLE project.items
(
`price` Int64,
`name` String,
`_id` String,
`uuid` String,
`_sign` Int8,
`_version` UInt64
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY _id;


CREATE TABLE public.protocol_test
(
`id` Int64,
`consultation_id` Int64,
`recomendation` Nullable(String),
`create_date` DateTime64(6),
`_sign` Int8,
`_version` UInt64
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id;

CREATE DATABASE altinity_sink_connector;

CREATE TABLE altinity_sink_connector.replica_source_info
(
`id` String,
`offset_key` String,
`offset_val` String,
`record_insert_ts` DateTime,
`record_insert_seq` UInt64,
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id
SETTINGS index_granularity = 8198;

USE employees;
CREATE TABLE employees.dt
(
`timestamp` DateTime('Asia/Istanbul'),
`json` String,
`event_id` UInt8,
`sign` Int8,
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9)),
`_updated` DateTime MATERIALIZED now()
)
ENGINE = ReplacingMergeTree(_version) ORDER by event_id;

CREATE TABLE employees.temporal_types_DATETIME
(
`Type` String,
`Minimum_Value` DateTime,
`Mid_Value` DateTime64,
`Maximum_Value` DateTime,
`Null_Value` Nullable(DateTime),
`_version` UInt64,
`is_deleted` UInt8
) ENGINE = ReplacingMergeTree(_version, is_deleted)
ORDER BY Type
SETTINGS index_granularity = 8192;

CREATE TABLE employees.temporal_types_DATETIME1
(
`Type` String,
`Minimum_Value` DateTime,
`Mid_Value` DateTime,
`Maximum_Value` DateTime,
`Null_Value` Nullable(DateTime),
`_version` UInt64,
`is_deleted` UInt8
) ENGINE = ReplacingMergeTree(_version, is_deleted)
ORDER BY Type
SETTINGS index_granularity = 8192;

CREATE TABLE employees.temporal_types_DATETIME2
(
`Type` String,
`Minimum_Value` DateTime,
`Mid_Value` DateTime,
`Maximum_Value` DateTime,
`Null_Value` Nullable(DateTime),
`_version` UInt64,
`is_deleted` UInt8
)
ENGINE = ReplacingMergeTree(_version, is_deleted)
ORDER BY Type
SETTINGS index_granularity = 8192;
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
--CREATE USER 'ch_user' IDENTIFIED WITH plaintext_password BY 'root';
--SET allow_introspection_functions=1;
--GRANT ALL ON . TO 'ch_user' WITH GRANT OPTION
--
--
-- CREATE USER ch_user IDENTIFIED WITH plaintext_password BY 'password';
CREATE database datatypes;
CREATE database employees;
CREATE database public;
CREATE database project;

CREATE TABLE project.items
(
`price` Int64,
`name` String,
`_id` String,
`uuid` String,
`_sign` Int8,
`_version` UInt64
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY _id;


CREATE TABLE public.protocol_test
(
`id` Int64,
`consultation_id` Int64,
`recomendation` Nullable(String),
`create_date` DateTime64(6),
`_sign` Int8,
`_version` UInt64
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id;

CREATE DATABASE altinity_sink_connector;

CREATE TABLE altinity_sink_connector.replica_source_info
(
`id` String,
`offset_key` String,
`offset_val` String,
`record_insert_ts` DateTime,
`record_insert_seq` UInt64,
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id
SETTINGS index_granularity = 8198;

USE employees;
CREATE TABLE employees.dt
(
`timestamp` DateTime('Asia/Istanbul'),
`json` String,
`event_id` UInt8,
`sign` Int8,
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9)),
`_updated` DateTime MATERIALIZED now()
)
ENGINE = ReplacingMergeTree(_version) ORDER by event_id;

CREATE TABLE employees.temporal_types_DATETIME
(
`Type` String,
`Minimum_Value` DateTime64('Asia/Istanbul'),
`Mid_Value` DateTime64('Asia/Istanbul'),
`Maximum_Value` DateTime64('Asia/Istanbul'),
`Null_Value` Nullable(DateTime64('Asia/Istanbul')),
`_version` UInt64,
`is_deleted` UInt8
) ENGINE = ReplacingMergeTree(_version, is_deleted)
ORDER BY Type
SETTINGS index_granularity = 8192;

CREATE TABLE employees.temporal_types_DATETIME1
(
`Type` String,
`Minimum_Value` DateTime64('Asia/Istanbul'),
`Mid_Value` DateTime64('Asia/Istanbul'),
`Maximum_Value` DateTime64('Asia/Istanbul'),
`Null_Value` Nullable(DateTime64('Asia/Istanbul')),
`_version` UInt64,
`is_deleted` UInt8
) ENGINE = ReplacingMergeTree(_version, is_deleted)
ORDER BY Type
SETTINGS index_granularity = 8192;

CREATE TABLE employees.temporal_types_DATETIME2
(
`Type` String,
`Minimum_Value` DateTime64('Asia/Istanbul'),
`Mid_Value` DateTime64('Asia/Istanbul'),
`Maximum_Value` DateTime64('Asia/Istanbul'),
`Null_Value` Nullable(DateTime64('Asia/Istanbul')),
`_version` UInt64,
`is_deleted` UInt8
)
ENGINE = ReplacingMergeTree(_version, is_deleted)
ORDER BY Type
SETTINGS index_granularity = 8192;
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public MutablePair<String, Map<String, Integer>> getInsertQueryUsingInputFunctio
// Get Field Name and lookup in the Clickhouse column to datatype map.
String dataType = ClickHouseUtils.escape(columnNameToDataTypeMap.get(entry.getKey()), '\'');

if(dataType.contains("DateTime")) {
dataType = "String";
}
if(dataType != null) {
// Is the column a kafka metadata column.
if(isKafkaMetaDataColumn(sourceColumnName)) {
Expand Down

0 comments on commit 44ab85d

Please sign in to comment.