Skip to content

Commit

Permalink
Merge pull request #653 from Altinity/648-when-overriding-source-dest…
Browse files Browse the repository at this point in the history
…ination-database-names-the-databases-from-the-source-are-autocreated-with-the-same-name

Added logic to OVERRIDE CREATE DATABASE SQL
  • Loading branch information
subkanthi authored Jun 28, 2024
2 parents 346ca12 + 6cb7c2d commit a75f98a
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 15 deletions.
2 changes: 2 additions & 0 deletions sink-connector-lightweight/docker/config_local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ offset.storage.jdbc.user: "root"
# offset.storage.jdbc.password: The password of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.password: "root"

clickhouse.database.override.map: "sbtest:chtest"

# offset.storage.jdbc.offset.table.ddl: The DDL statement used to create the database table where connector offsets are to be stored.(Advanced)
offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class Constants {
public static final String DROP_TABLE = "DROP TABLE";


public static final String CREATE_DATABASE = "CREATE DATABASE %s";
public static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS %s";

public static final String DROP_COLUMN = "DROP COLUMN %s";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.common.Utils;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParser.AlterByAddColumnContext;
import io.debezium.ddl.parser.mysql.generated.MySqlParser.TableNameContext;
Expand All @@ -17,10 +18,7 @@
import org.apache.logging.log4j.Logger;

import java.time.ZoneId;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.util.*;


/**
Expand Down Expand Up @@ -71,7 +69,26 @@ public ZoneId parseTimeZone() {
public void enterCreateDatabase(MySqlParser.CreateDatabaseContext createDatabaseContext) {
for (ParseTree tree : createDatabaseContext.children) {
if (tree instanceof MySqlParser.UidContext) {
this.query.append(String.format(Constants.CREATE_DATABASE, tree.getText()));

String databaseName = tree.getText();
if(!databaseName.isEmpty()) {
// Check if the database is overridden
Map<String, String> sourceToDestinationMap = new HashMap<>();

try {
if (this.config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()) != null)
sourceToDestinationMap = Utils.parseSourceToDestinationDatabaseMap(this.config.
getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()));
} catch(Exception e) {
log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString());
}

if(sourceToDestinationMap.containsKey(databaseName)) {
this.query.append(String.format(Constants.CREATE_DATABASE, sourceToDestinationMap.get(databaseName)));
} else {
this.query.append(String.format(Constants.CREATE_DATABASE, databaseName));
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ public void testCreateDatabase() {
String sql = "create database test_ddl";
mySQLDDLParserService.parseSql(sql, "table1", clickHouseQuery);

Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(sql));
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("create database if not exists test_ddl"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,17 @@ private boolean executePreparedStatement(String insertQuery, String topicName,

if (CdcRecordState.CDC_RECORD_STATE_BEFORE == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
insertPreparedStatement(entry.getKey().right, ps, record.getBeforeModifiedFields(), record, record.getBeforeStruct(),
true, config, columnToDataTypeMap, engine);
true, config, columnToDataTypeMap, engine, tableName);
} else if (CdcRecordState.CDC_RECORD_STATE_AFTER == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
insertPreparedStatement(entry.getKey().right, ps, record.getAfterModifiedFields(), record, record.getAfterStruct(),
false, config, columnToDataTypeMap, engine);
false, config, columnToDataTypeMap, engine, tableName);
} else if (CdcRecordState.CDC_RECORD_STATE_BOTH == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
if (engine != null && engine.getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE.getEngine())) {
insertPreparedStatement(entry.getKey().right, ps, record.getBeforeModifiedFields(), record, record.getBeforeStruct(),
true, config, columnToDataTypeMap, engine);
true, config, columnToDataTypeMap, engine, tableName);
}
insertPreparedStatement(entry.getKey().right, ps, record.getAfterModifiedFields(), record, record.getAfterStruct(),
false, config, columnToDataTypeMap, engine);
false, config, columnToDataTypeMap, engine, tableName);
} else {
log.error("INVALID CDC RECORD STATE");
}
Expand Down Expand Up @@ -205,7 +205,7 @@ public void insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, P
ClickHouseStruct record, Struct struct, boolean beforeSection,
ClickHouseSinkConnectorConfig config,
Map<String, String> columnNameToDataTypeMap,
DBMetadata.TABLE_ENGINE engine) throws Exception {
DBMetadata.TABLE_ENGINE engine, String tableName) throws Exception {


// int index = 1;
Expand Down Expand Up @@ -247,11 +247,12 @@ public void insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, P
// if the field is not present.
// If the record was not supplied, we need to set it as null.
// Ignore version and sign columns.
if(colName.equalsIgnoreCase(versionColumn) || colName.equalsIgnoreCase(signColumn)) {
if(colName.equalsIgnoreCase(versionColumn) || colName.equalsIgnoreCase(signColumn) ||
colName.equalsIgnoreCase(replacingMergeTreeDeleteColumn)) {

} else {
log.error(String.format("********** ERROR: Database(%s), ClickHouse column %s not present in source ************", databaseName, colName));
log.error(String.format("********** ERROR: Database(%s), Setting column %s to NULL might fail for non-nullable columns ************", databaseName, colName));
log.error(String.format("********** ERROR: Database(%s), Table(%s), ClickHouse column %s not present in source ************", databaseName, tableName, colName));
log.error(String.format("********** ERROR: Database(%s), Table(%s), Setting column %s to NULL might fail for non-nullable columns ************", databaseName, tableName, colName));
}
ps.setNull(index, Types.OTHER);
continue;
Expand Down

0 comments on commit a75f98a

Please sign in to comment.