Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logic to OVERRIDE CREATE DATABASE SQL #653

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sink-connector-lightweight/docker/config_local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ offset.storage.jdbc.user: "root"
# offset.storage.jdbc.password: The password of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.password: "root"

clickhouse.database.override.map: "sbtest:chtest"

# offset.storage.jdbc.offset.table.ddl: The DDL statement used to create the database table where connector offsets are to be stored.(Advanced)
offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class Constants {
public static final String DROP_TABLE = "DROP TABLE";


public static final String CREATE_DATABASE = "CREATE DATABASE %s";
public static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS %s";

public static final String DROP_COLUMN = "DROP COLUMN %s";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.common.Utils;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParser.AlterByAddColumnContext;
import io.debezium.ddl.parser.mysql.generated.MySqlParser.TableNameContext;
Expand All @@ -17,10 +18,7 @@
import org.apache.logging.log4j.Logger;

import java.time.ZoneId;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.util.*;


/**
Expand Down Expand Up @@ -71,7 +69,26 @@ public ZoneId parseTimeZone() {
public void enterCreateDatabase(MySqlParser.CreateDatabaseContext createDatabaseContext) {
for (ParseTree tree : createDatabaseContext.children) {
if (tree instanceof MySqlParser.UidContext) {
this.query.append(String.format(Constants.CREATE_DATABASE, tree.getText()));

String databaseName = tree.getText();
if(!databaseName.isEmpty()) {
// Check if the database is overridden
Map<String, String> sourceToDestinationMap = new HashMap<>();

try {
if (this.config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()) != null)
sourceToDestinationMap = Utils.parseSourceToDestinationDatabaseMap(this.config.
getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()));
} catch(Exception e) {
log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString());
}

if(sourceToDestinationMap.containsKey(databaseName)) {
this.query.append(String.format(Constants.CREATE_DATABASE, sourceToDestinationMap.get(databaseName)));
} else {
this.query.append(String.format(Constants.CREATE_DATABASE, databaseName));
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ public void testCreateDatabase() {
String sql = "create database test_ddl";
mySQLDDLParserService.parseSql(sql, "table1", clickHouseQuery);

Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(sql));
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("create database if not exists test_ddl"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,17 @@ private boolean executePreparedStatement(String insertQuery, String topicName,

if (CdcRecordState.CDC_RECORD_STATE_BEFORE == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
insertPreparedStatement(entry.getKey().right, ps, record.getBeforeModifiedFields(), record, record.getBeforeStruct(),
true, config, columnToDataTypeMap, engine);
true, config, columnToDataTypeMap, engine, tableName);
} else if (CdcRecordState.CDC_RECORD_STATE_AFTER == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
insertPreparedStatement(entry.getKey().right, ps, record.getAfterModifiedFields(), record, record.getAfterStruct(),
false, config, columnToDataTypeMap, engine);
false, config, columnToDataTypeMap, engine, tableName);
} else if (CdcRecordState.CDC_RECORD_STATE_BOTH == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
if (engine != null && engine.getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE.getEngine())) {
insertPreparedStatement(entry.getKey().right, ps, record.getBeforeModifiedFields(), record, record.getBeforeStruct(),
true, config, columnToDataTypeMap, engine);
true, config, columnToDataTypeMap, engine, tableName);
}
insertPreparedStatement(entry.getKey().right, ps, record.getAfterModifiedFields(), record, record.getAfterStruct(),
false, config, columnToDataTypeMap, engine);
false, config, columnToDataTypeMap, engine, tableName);
} else {
log.error("INVALID CDC RECORD STATE");
}
Expand Down Expand Up @@ -205,7 +205,7 @@ public void insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, P
ClickHouseStruct record, Struct struct, boolean beforeSection,
ClickHouseSinkConnectorConfig config,
Map<String, String> columnNameToDataTypeMap,
DBMetadata.TABLE_ENGINE engine) throws Exception {
DBMetadata.TABLE_ENGINE engine, String tableName) throws Exception {


// int index = 1;
Expand Down Expand Up @@ -247,11 +247,12 @@ public void insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, P
// if the field is not present.
// If the record was not supplied, we need to set it as null.
// Ignore version and sign columns.
if(colName.equalsIgnoreCase(versionColumn) || colName.equalsIgnoreCase(signColumn)) {
if(colName.equalsIgnoreCase(versionColumn) || colName.equalsIgnoreCase(signColumn) ||
colName.equalsIgnoreCase(replacingMergeTreeDeleteColumn)) {

} else {
log.error(String.format("********** ERROR: Database(%s), ClickHouse column %s not present in source ************", databaseName, colName));
log.error(String.format("********** ERROR: Database(%s), Setting column %s to NULL might fail for non-nullable columns ************", databaseName, colName));
log.error(String.format("********** ERROR: Database(%s), Table(%s), ClickHouse column %s not present in source ************", databaseName, tableName, colName));
log.error(String.format("********** ERROR: Database(%s), Table(%s), Setting column %s to NULL might fail for non-nullable columns ************", databaseName, tableName, colName));
}
ps.setNull(index, Types.OTHER);
continue;
Expand Down
Loading