From bdb551ffbc1b47b938ad6784b88d613130165430 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 10 May 2024 15:58:03 -0400 Subject: [PATCH 1/7] Upgraded to debezium 2.7.0 and update new function definition for MySQLValueConverter and updated interface for MySQLDDLBaseListener --- sink-connector-lightweight/pom.xml | 2 +- .../parser/MySQLDDLParserBaseListener.java | 30 +++++++++++++++++++ .../embedded/parser/DataTypeConverter.java | 9 +++--- sink-connector/pom.xml | 2 +- 4 files changed, 37 insertions(+), 6 deletions(-) diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml index 136498017..48ccc9a2a 100644 --- a/sink-connector-lightweight/pom.xml +++ b/sink-connector-lightweight/pom.xml @@ -13,7 +13,7 @@ 17 UTF-8 UTF-8 - 2.5.0.Beta1 + 2.7.0.Alpha1 5.9.1 1.19.1 3.1.1 diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDDLParserBaseListener.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDDLParserBaseListener.java index bbc266578..1fe8dfaf9 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDDLParserBaseListener.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDDLParserBaseListener.java @@ -1288,6 +1288,36 @@ public void exitPartitionFunctionList(MySqlParser.PartitionFunctionListContext p } + @Override + public void enterPartitionSystemVersion(MySqlParser.PartitionSystemVersionContext partitionSystemVersionContext) { + + } + + @Override + public void exitPartitionSystemVersion(MySqlParser.PartitionSystemVersionContext partitionSystemVersionContext) { + + } + + @Override + public void enterPartitionSystemVersionDefinitions(MySqlParser.PartitionSystemVersionDefinitionsContext partitionSystemVersionDefinitionsContext) { + + } + + @Override + public void exitPartitionSystemVersionDefinitions(MySqlParser.PartitionSystemVersionDefinitionsContext partitionSystemVersionDefinitionsContext) { + + } + + @Override + public void enterPartitionSystemVersionDefinition(MySqlParser.PartitionSystemVersionDefinitionContext partitionSystemVersionDefinitionContext) { + + } + + @Override + public void exitPartitionSystemVersionDefinition(MySqlParser.PartitionSystemVersionDefinitionContext partitionSystemVersionDefinitionContext) { + + } + @Override public void enterSubPartitionFunctionHash(MySqlParser.SubPartitionFunctionHashContext subPartitionFunctionHashContext) { diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/DataTypeConverter.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/DataTypeConverter.java index 69b6558c1..d6e5f39af 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/DataTypeConverter.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/DataTypeConverter.java @@ -4,7 +4,7 @@ import com.clickhouse.data.ClickHouseDataType; import io.debezium.antlr.DataTypeResolver; import io.debezium.config.CommonConnectorConfig; -import io.debezium.connector.mysql.MySqlValueConverters; +import io.debezium.connector.mysql.jdbc.MySqlValueConverters; import io.debezium.ddl.parser.mysql.generated.MySqlParser; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.jdbc.TemporalPrecisionMode; @@ -34,8 +34,8 @@ public static ClickHouseDataType convert(String columnName, MySqlParser.DataType JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, JdbcValueConverters.BigIntUnsignedMode.LONG, - CommonConnectorConfig.BinaryHandlingMode.BYTES - ); + CommonConnectorConfig.BinaryHandlingMode.BYTES, + x ->x, CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN); DataType dataType = initializeDataTypeResolver().resolveDataType(columnDefChild); @@ -51,7 +51,8 @@ public static String convertToString(String columnName, int scale, int precision JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, JdbcValueConverters.BigIntUnsignedMode.LONG, - CommonConnectorConfig.BinaryHandlingMode.BYTES + CommonConnectorConfig.BinaryHandlingMode.BYTES, + x ->x, CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN ); diff --git a/sink-connector/pom.xml b/sink-connector/pom.xml index c4ab7edfa..fbaf25d8e 100644 --- a/sink-connector/pom.xml +++ b/sink-connector/pom.xml @@ -308,7 +308,7 @@ io.debezium debezium-core - 2.5.0.Beta1 + 2.7.0.Alpha1 From 4ec2f33d6e391b4488c7250cab4d7ae83f207139 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sat, 11 May 2024 22:29:35 -0400 Subject: [PATCH 2/7] Added log.error statements for all RunTimeException --- .../embedded/ClickHouseDebeziumEmbeddedApplication.java | 2 ++ .../debezium/embedded/cdc/DebeziumChangeEventCapture.java | 2 ++ .../debezium/embedded/parser/SourceRecordParserService.java | 1 + sink-connector-lightweight/src/main/resources/log4j2.xml | 4 ++++ .../sink/connector/db/batch/PreparedStatementExecutor.java | 2 ++ .../sink/connector/executor/ClickHouseBatchRunnable.java | 1 + .../sink/connector/executor/DebeziumOffsetManagement.java | 1 + 7 files changed, 13 insertions(+) diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java index 168f7fcd2..f98dd278c 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java @@ -181,8 +181,10 @@ public void run() { start(injector.getInstance(DebeziumRecordParserService.class), injector.getInstance(DDLParserService.class), props, true); } catch (IOException e) { + log.error("**** ERROR: Restarting Event Loop ****", e); throw new RuntimeException(e); } catch (Exception e) { + log.error("**** ERROR: Restarting Event Loop ****", e); throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index bfc7d38f0..e316d784d 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -581,11 +581,13 @@ public void handle(boolean b, String s, Throwable throwable) { try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { + log.error("Error sleeping", e); throw new RuntimeException(e); } try { setupDebeziumEventCapture(props, debeziumRecordParserService, config); } catch (IOException | ClassNotFoundException e) { + log.error("Error setting up debezium event capture", e); throw new RuntimeException(e); } } diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/SourceRecordParserService.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/SourceRecordParserService.java index b9cf35c67..750eaf058 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/SourceRecordParserService.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/SourceRecordParserService.java @@ -137,6 +137,7 @@ private ClickHouseStruct readBeforeOrAfterSection(Map convertedV } } } catch (ParseException e) { + log.error("Error parsing JSON", e); throw new RuntimeException(e); } diff --git a/sink-connector-lightweight/src/main/resources/log4j2.xml b/sink-connector-lightweight/src/main/resources/log4j2.xml index 9321838cb..a770c9dce 100644 --- a/sink-connector-lightweight/src/main/resources/log4j2.xml +++ b/sink-connector-lightweight/src/main/resources/log4j2.xml @@ -6,6 +6,10 @@ + + + diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java index 35ee1cdc0..4e1bef2e0 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java @@ -177,11 +177,13 @@ private boolean executePreparedStatement(String insertQuery, String topicName, try { ps = conn.prepareStatement("TRUNCATE TABLE " + databaseName + "." + tableName); } catch (SQLException e) { + log.error("*** Error: Truncate table statement error ****", e); throw new RuntimeException(e); } try { ps.execute(); } catch (SQLException e) { + log.error("*** Error: Truncate table statement execute error ****", e); throw new RuntimeException(e); } } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java index 0a5df9349..4dc88934c 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java @@ -192,6 +192,7 @@ public void run() { try { Thread.sleep(10000); } catch (InterruptedException ex) { + log.error("******* ERROR **** Thread interrupted *********", ex); throw new RuntimeException(ex); } } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java index bcc6b7942..060b16d03 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java @@ -116,6 +116,7 @@ static synchronized public boolean checkIfBatchCanBeCommitted(List Date: Tue, 14 May 2024 12:21:52 -0400 Subject: [PATCH 3/7] Added support to use user provided rmt delete column --- .../clickhouse/sink/connector/db/DbWriter.java | 3 ++- .../operations/ClickHouseAutoCreateTable.java | 18 ++++++++++++------ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java index bae2efd2a..3fa7e3e6b 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java @@ -119,8 +119,9 @@ public DbWriter( } boolean useReplicatedReplacingMergeTree = this.config.getBoolean( ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString()); + String rmtDeleteColumn = this.config.getString(ClickHouseSinkConnectorConfigVariables.REPLACING_MERGE_TREE_DELETE_COLUMN.toString()); act.createNewTable(record.getPrimaryKey(), tableName, database, fields, this.conn, - isNewReplacingMergeTreeEngine, useReplicatedReplacingMergeTree); + isNewReplacingMergeTreeEngine, useReplicatedReplacingMergeTree, rmtDeleteColumn); } catch (Exception e) { log.error("**** Error creating table ***" + tableName, e); } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java index 41d3f63a6..45ba44922 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java @@ -1,5 +1,6 @@ package com.altinity.clickhouse.sink.connector.db.operations; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; import com.clickhouse.data.ClickHouseDataType; import com.clickhouse.jdbc.ClickHouseConnection; import com.google.common.annotations.VisibleForTesting; @@ -26,10 +27,10 @@ public class ClickHouseAutoCreateTable extends ClickHouseTableOperationsBase{ public void createNewTable(ArrayList primaryKey, String tableName, String databaseName, Field[] fields, ClickHouseConnection connection, boolean isNewReplacingMergeTree, - boolean useReplicatedReplacingMergeTree) throws SQLException { + boolean useReplicatedReplacingMergeTree, String rmtDeleteColumn) throws SQLException { Map colNameToDataTypeMap = this.getColumnNameToCHDataTypeMapping(fields); String createTableQuery = this.createTableSyntax(primaryKey, tableName, databaseName, fields, colNameToDataTypeMap, - isNewReplacingMergeTree, useReplicatedReplacingMergeTree); + isNewReplacingMergeTree, useReplicatedReplacingMergeTree, rmtDeleteColumn); log.info("**** AUTO CREATE TABLE " + createTableQuery); // ToDO: need to run it before a session is created. this.runQuery(createTableQuery, connection); @@ -45,23 +46,24 @@ public void createNewTable(ArrayList primaryKey, String tableName, Strin public java.lang.String createTableSyntax(ArrayList primaryKey, String tableName, String databaseName, Field[] fields, Map columnToDataTypesMap, boolean isNewReplacingMergeTreeEngine, - boolean useReplicatedReplacingMergeTree) { + boolean useReplicatedReplacingMergeTree, + String rmtDeleteColumn) { StringBuilder createTableSyntax = new StringBuilder(); createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`").append("("); - for(Field f: fields) { + for (Field f : fields) { String colName = f.name(); String dataType = columnToDataTypesMap.get(colName); boolean isNull = false; - if(f.schema().isOptional() == true) { + if (f.schema().isOptional() == true) { isNull = true; } createTableSyntax.append("`").append(colName).append("`").append(" ").append(dataType); // Ignore setting NULL OR not NULL for JSON and Array - if(dataType != null && + if (dataType != null && (dataType.equalsIgnoreCase(ClickHouseDataType.JSON.name()) || dataType.contains(ClickHouseDataType.Array.name()))) { // ignore adding nulls; @@ -78,6 +80,10 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t String isDeletedColumn = IS_DELETED_COLUMN; + if(rmtDeleteColumn != null && !rmtDeleteColumn.isEmpty()) { + isDeletedColumn = rmtDeleteColumn; + } + if(isNewReplacingMergeTreeEngine == true) { createTableSyntax.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE).append(","); createTableSyntax.append("`").append(isDeletedColumn).append("` ").append(IS_DELETED_COLUMN_DATA_TYPE); From 97eb648759019291508d14b9bb37b8c5b061cab6 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 14 May 2024 13:39:42 -0400 Subject: [PATCH 4/7] Modified Integration tests for AutoCreateTable --- .../db/operations/ClickHouseAutoCreateTableTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java index 060741219..e8bbe1e9c 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java @@ -125,7 +125,7 @@ public void testCreateTableSyntax() { ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable(); String query = act.createTableSyntax(primaryKeys, "auto_create_table", "employees", - createFields(), this.columnToDataTypesMap, false, false); + createFields(), this.columnToDataTypesMap, false, false, null); System.out.println("QUERY" + query); Assert.assertTrue(query.equalsIgnoreCase("CREATE TABLE employees.`auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY(customerName)")); //Assert.assertTrue(query.equalsIgnoreCase("CREATE TABLE auto_create_table(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY (customerName)")); @@ -137,7 +137,7 @@ public void testCreateTableEmptyPrimaryKey() { ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable(); String query = act.createTableSyntax(null, "auto_create_table", "employees", createFields(), - this.columnToDataTypesMap, false, false); + this.columnToDataTypesMap, false, false, null); String expectedQuery = "CREATE TABLE employees.`auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()"; Assert.assertTrue(query.equalsIgnoreCase(expectedQuery)); @@ -151,7 +151,7 @@ public void testCreateTableMultiplePrimaryKeys() { ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable(); String query = act.createTableSyntax(primaryKeys, "auto_create_table", "customers", createFields(), - this.columnToDataTypesMap, false, false); + this.columnToDataTypesMap, false, false, null); String expectedQuery = "CREATE TABLE customers.`auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()"; Assert.assertTrue(query.equalsIgnoreCase(expectedQuery)); @@ -181,7 +181,7 @@ public void testCreateNewTable() { try { act.createNewTable(primaryKeys, "auto_create_table", "default", this.createFields(), writer.getConnection(), - false, false); + false, false, null); } catch(SQLException se) { Assert.assertTrue(false); } From 481ddbe0e43be8fd9c8b21e356989f9724fe9bbd Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 27 May 2024 11:38:36 -0400 Subject: [PATCH 5/7] Revert back debezium version to 2.5.0.Beta1 --- sink-connector-lightweight/pom.xml | 2 +- sink-connector/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml index 48ccc9a2a..136498017 100644 --- a/sink-connector-lightweight/pom.xml +++ b/sink-connector-lightweight/pom.xml @@ -13,7 +13,7 @@ 17 UTF-8 UTF-8 - 2.7.0.Alpha1 + 2.5.0.Beta1 5.9.1 1.19.1 3.1.1 diff --git a/sink-connector/pom.xml b/sink-connector/pom.xml index fbaf25d8e..c4ab7edfa 100644 --- a/sink-connector/pom.xml +++ b/sink-connector/pom.xml @@ -308,7 +308,7 @@ io.debezium debezium-core - 2.7.0.Alpha1 + 2.5.0.Beta1 From 8976dbaa28ab3fa139c8516144636d78d70ff323 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 27 May 2024 13:49:44 -0400 Subject: [PATCH 6/7] Reverted back change to MySQLValueConverter to support debezium 2.5 --- .../debezium/embedded/parser/DataTypeConverter.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/DataTypeConverter.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/DataTypeConverter.java index d6e5f39af..ef319be68 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/DataTypeConverter.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/DataTypeConverter.java @@ -4,7 +4,7 @@ import com.clickhouse.data.ClickHouseDataType; import io.debezium.antlr.DataTypeResolver; import io.debezium.config.CommonConnectorConfig; -import io.debezium.connector.mysql.jdbc.MySqlValueConverters; +import io.debezium.connector.mysql.MySqlValueConverters; import io.debezium.ddl.parser.mysql.generated.MySqlParser; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.jdbc.TemporalPrecisionMode; @@ -34,8 +34,7 @@ public static ClickHouseDataType convert(String columnName, MySqlParser.DataType JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, JdbcValueConverters.BigIntUnsignedMode.LONG, - CommonConnectorConfig.BinaryHandlingMode.BYTES, - x ->x, CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN); + CommonConnectorConfig.BinaryHandlingMode.BYTES); DataType dataType = initializeDataTypeResolver().resolveDataType(columnDefChild); @@ -51,8 +50,7 @@ public static String convertToString(String columnName, int scale, int precision JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, JdbcValueConverters.BigIntUnsignedMode.LONG, - CommonConnectorConfig.BinaryHandlingMode.BYTES, - x ->x, CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN + CommonConnectorConfig.BinaryHandlingMode.BYTES ); From 13bbf92293baf9a67617667066ecea22eb17c9ca Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 27 May 2024 13:53:29 -0400 Subject: [PATCH 7/7] Reverted back MySQL DDL changes to support debezium 2.5 --- .../dependency-reduced-pom.xml | 3 +- .../parser/MySQLDDLParserBaseListener.java | 30 ------------------- 2 files changed, 2 insertions(+), 31 deletions(-) diff --git a/sink-connector-lightweight/dependency-reduced-pom.xml b/sink-connector-lightweight/dependency-reduced-pom.xml index e2e59b558..5c3b1f68f 100644 --- a/sink-connector-lightweight/dependency-reduced-pom.xml +++ b/sink-connector-lightweight/dependency-reduced-pom.xml @@ -72,6 +72,7 @@ + filesystem listener com.altinity.clickhouse.debezium.embedded.FailFastListener @@ -79,6 +80,7 @@ true true true + ${surefire.test.runOrder} @@ -297,7 +299,6 @@ quarkus-bom 1.19.1 3.0.0-M7 - 5.2.1 UTF-8 0.0.8 5.9.1 diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDDLParserBaseListener.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDDLParserBaseListener.java index 1fe8dfaf9..bbc266578 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDDLParserBaseListener.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLDDLParserBaseListener.java @@ -1288,36 +1288,6 @@ public void exitPartitionFunctionList(MySqlParser.PartitionFunctionListContext p } - @Override - public void enterPartitionSystemVersion(MySqlParser.PartitionSystemVersionContext partitionSystemVersionContext) { - - } - - @Override - public void exitPartitionSystemVersion(MySqlParser.PartitionSystemVersionContext partitionSystemVersionContext) { - - } - - @Override - public void enterPartitionSystemVersionDefinitions(MySqlParser.PartitionSystemVersionDefinitionsContext partitionSystemVersionDefinitionsContext) { - - } - - @Override - public void exitPartitionSystemVersionDefinitions(MySqlParser.PartitionSystemVersionDefinitionsContext partitionSystemVersionDefinitionsContext) { - - } - - @Override - public void enterPartitionSystemVersionDefinition(MySqlParser.PartitionSystemVersionDefinitionContext partitionSystemVersionDefinitionContext) { - - } - - @Override - public void exitPartitionSystemVersionDefinition(MySqlParser.PartitionSystemVersionDefinitionContext partitionSystemVersionDefinitionContext) { - - } - @Override public void enterSubPartitionFunctionHash(MySqlParser.SubPartitionFunctionHashContext subPartitionFunctionHashContext) {