diff --git a/sink-connector-lightweight/docker/config_local.yml b/sink-connector-lightweight/docker/config_local.yml index a3413c50c..ba18ebf44 100644 --- a/sink-connector-lightweight/docker/config_local.yml +++ b/sink-connector-lightweight/docker/config_local.yml @@ -66,6 +66,8 @@ offset.storage.jdbc.user: "root" # offset.storage.jdbc.password: The password of the database user to be used when connecting to the database where connector offsets are to be stored. offset.storage.jdbc.password: "root" +clickhouse.database.override.map: "sbtest:chtest" + # offset.storage.jdbc.offset.table.ddl: The DDL statement used to create the database table where connector offsets are to be stored.(Advanced) offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s ( diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/Constants.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/Constants.java index 3972a7b9e..a76f04cb4 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/Constants.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/Constants.java @@ -45,7 +45,7 @@ public class Constants { public static final String DROP_TABLE = "DROP TABLE"; - public static final String CREATE_DATABASE = "CREATE DATABASE %s"; + public static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS %s"; public static final String DROP_COLUMN = "DROP COLUMN %s"; diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java index 6257e27b0..e6b297a84 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java @@ -7,6 +7,7 @@ import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; +import com.altinity.clickhouse.sink.connector.common.Utils; import io.debezium.ddl.parser.mysql.generated.MySqlParser; import io.debezium.ddl.parser.mysql.generated.MySqlParser.AlterByAddColumnContext; import io.debezium.ddl.parser.mysql.generated.MySqlParser.TableNameContext; @@ -17,10 +18,7 @@ import org.apache.logging.log4j.Logger; import java.time.ZoneId; -import java.util.HashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Set; +import java.util.*; /** @@ -71,7 +69,26 @@ public ZoneId parseTimeZone() { public void enterCreateDatabase(MySqlParser.CreateDatabaseContext createDatabaseContext) { for (ParseTree tree : createDatabaseContext.children) { if (tree instanceof MySqlParser.UidContext) { - this.query.append(String.format(Constants.CREATE_DATABASE, tree.getText())); + + String databaseName = tree.getText(); + if(!databaseName.isEmpty()) { + // Check if the database is overridden + Map sourceToDestinationMap = new HashMap<>(); + + try { + if (this.config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()) != null) + sourceToDestinationMap = Utils.parseSourceToDestinationDatabaseMap(this.config. + getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString())); + } catch(Exception e) { + log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString()); + } + + if(sourceToDestinationMap.containsKey(databaseName)) { + this.query.append(String.format(Constants.CREATE_DATABASE, sourceToDestinationMap.get(databaseName))); + } else { + this.query.append(String.format(Constants.CREATE_DATABASE, databaseName)); + } + } } } } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java index 25cf6ce0a..0e790698d 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java @@ -606,7 +606,7 @@ public void testCreateDatabase() { String sql = "create database test_ddl"; mySQLDDLParserService.parseSql(sql, "table1", clickHouseQuery); - Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(sql)); + Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("create database if not exists test_ddl")); } @Test diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java index eb66bb609..a7b321d08 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java @@ -138,17 +138,17 @@ private boolean executePreparedStatement(String insertQuery, String topicName, if (CdcRecordState.CDC_RECORD_STATE_BEFORE == getCdcSectionBasedOnOperation(record.getCdcOperation())) { insertPreparedStatement(entry.getKey().right, ps, record.getBeforeModifiedFields(), record, record.getBeforeStruct(), - true, config, columnToDataTypeMap, engine); + true, config, columnToDataTypeMap, engine, tableName); } else if (CdcRecordState.CDC_RECORD_STATE_AFTER == getCdcSectionBasedOnOperation(record.getCdcOperation())) { insertPreparedStatement(entry.getKey().right, ps, record.getAfterModifiedFields(), record, record.getAfterStruct(), - false, config, columnToDataTypeMap, engine); + false, config, columnToDataTypeMap, engine, tableName); } else if (CdcRecordState.CDC_RECORD_STATE_BOTH == getCdcSectionBasedOnOperation(record.getCdcOperation())) { if (engine != null && engine.getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE.getEngine())) { insertPreparedStatement(entry.getKey().right, ps, record.getBeforeModifiedFields(), record, record.getBeforeStruct(), - true, config, columnToDataTypeMap, engine); + true, config, columnToDataTypeMap, engine, tableName); } insertPreparedStatement(entry.getKey().right, ps, record.getAfterModifiedFields(), record, record.getAfterStruct(), - false, config, columnToDataTypeMap, engine); + false, config, columnToDataTypeMap, engine, tableName); } else { log.error("INVALID CDC RECORD STATE"); } @@ -205,7 +205,7 @@ public void insertPreparedStatement(Map columnNameToIndexMap, P ClickHouseStruct record, Struct struct, boolean beforeSection, ClickHouseSinkConnectorConfig config, Map columnNameToDataTypeMap, - DBMetadata.TABLE_ENGINE engine) throws Exception { + DBMetadata.TABLE_ENGINE engine, String tableName) throws Exception { // int index = 1; @@ -247,11 +247,12 @@ public void insertPreparedStatement(Map columnNameToIndexMap, P // if the field is not present. // If the record was not supplied, we need to set it as null. // Ignore version and sign columns. - if(colName.equalsIgnoreCase(versionColumn) || colName.equalsIgnoreCase(signColumn)) { + if(colName.equalsIgnoreCase(versionColumn) || colName.equalsIgnoreCase(signColumn) || + colName.equalsIgnoreCase(replacingMergeTreeDeleteColumn)) { } else { - log.error(String.format("********** ERROR: Database(%s), ClickHouse column %s not present in source ************", databaseName, colName)); - log.error(String.format("********** ERROR: Database(%s), Setting column %s to NULL might fail for non-nullable columns ************", databaseName, colName)); + log.error(String.format("********** ERROR: Database(%s), Table(%s), ClickHouse column %s not present in source ************", databaseName, tableName, colName)); + log.error(String.format("********** ERROR: Database(%s), Table(%s), Setting column %s to NULL might fail for non-nullable columns ************", databaseName, tableName, colName)); } ps.setNull(index, Types.OTHER); continue;