Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logic in auto.create.table to support table names with dash(hyphen) #439

Merged
merged 5 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,15 @@ static public Properties getDebeziumProperties(MySQLContainer mySqlContainer, Cl
return defaultProps;

}
}

static public Properties getDebeziumPropertiesForSchemaOnly(MySQLContainer mySqlContainer, ClickHouseContainer clickHouseContainer) throws Exception {

Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);

props.replace("snapshot.mode", "schema_only");
props.replace("disable.drop.truncate", "true");
props.setProperty("disable.ddl", "true");

return props;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.altinity.clickhouse.debezium.embedded.ddl.parser;

import com.altinity.clickhouse.debezium.embedded.ITCommon;
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.sql.Connection;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

@Testcontainers
@DisplayName("Integration Test that validates auto create tables feature which creates tables when a CDC record(Insert) is received")
public class AutoCreateTableIT {
protected MySQLContainer mySqlContainer;
static ClickHouseContainer clickHouseContainer;

@BeforeEach
public void startContainers() throws InterruptedException {
mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
.asCompatibleSubstituteFor("mysql"))
.withDatabaseName("employees").withUsername("root").withPassword("adminpass")
// .withInitScript("data_types.sql")
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));

BasicConfigurator.configure();
mySqlContainer.start();
// clickHouseContainer.start();
Thread.sleep(15000);
}

static {
clickHouseContainer = new org.testcontainers.clickhouse.ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
.withUsername("ch_user")
.withPassword("password")
.withExposedPorts(8123);

clickHouseContainer.start();
}
@ParameterizedTest
@CsvSource({
"clickhouse/clickhouse-server:latest",
"clickhouse/clickhouse-server:22.3"
})
@DisplayName("Test that validates auto create table when table name has dashes")
public void testAutoCreateTable(String clickHouseServerVersion) throws Exception {

Thread.sleep(5000);

Connection conn = ITCommon.connectToMySQL(mySqlContainer);
conn.prepareStatement("create table `new-table`(col1 varchar(255), col2 int, col3 int)").execute();

Thread.sleep(10000);


AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())),false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

Thread.sleep(10000);
conn.prepareStatement("insert into `new-table` values('test', 1, 2)").execute();
conn.close();

Thread.sleep(10000);

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);

ResultSet dateTimeResult = writer.executeQueryWithResultSet("select count(*) from `new-table`");
boolean resultReceived = false;

while(dateTimeResult.next()) {
resultReceived = true;
Assert.assertEquals(1, dateTimeResult.getInt(1));
}
Assert.assertTrue(resultReceived);

if(engine.get() != null) {
engine.get().stop();
}
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import com.altinity.clickhouse.debezium.embedded.ITCommon;
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
Expand All @@ -20,12 +18,8 @@
import org.testcontainers.utility.DockerImageName;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String t

StringBuilder createTableSyntax = new StringBuilder();

createTableSyntax.append(CREATE_TABLE).append(" ").append(tableName).append("(");
createTableSyntax.append(CREATE_TABLE).append(" ").append("`").append(tableName).append("`").append("(");

for(Field f: fields) {
String colName = f.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,9 @@ protected Field[] createFields() {
fields.add(new Field("customerName", 0, Schema.STRING_SCHEMA));
fields.add(new Field("occupation", 1, Schema.STRING_SCHEMA));
fields.add(new Field("quantity", 2, Schema.INT32_SCHEMA));

fields.add(new Field("amount_1", 3, Schema.FLOAT32_SCHEMA));

fields.add(new Field("amount", 4, Schema.FLOAT64_SCHEMA));
fields.add(new Field("employed", 5, Schema.BOOLEAN_SCHEMA));

fields.add(new Field("blob_storage", 6, SchemaBuilder.type(Schema.BYTES_SCHEMA.type()).
name(Decimal.LOGICAL_NAME).build()));

Expand All @@ -76,9 +73,7 @@ protected Field[] createFields() {
.name(Decimal.LOGICAL_NAME).build();

fields.add(new Field("blob_storage_scale", 7, decimalSchema));

fields.add(new Field("json_output", 8, Json.schema()));

fields.add(new Field("max_amount", 9, Schema.FLOAT64_SCHEMA));

Field[] result = new Field[fields.size()];
Expand All @@ -92,17 +87,11 @@ protected static Map<String, String> getExpectedColumnToDataTypesMap() {
columnToDataTypesMap.put("customerName", ClickHouseDataType.String.name());
columnToDataTypesMap.put("occupation", ClickHouseDataType.String.name());
columnToDataTypesMap.put("quantity", ClickHouseDataType.Int32.name());

columnToDataTypesMap.put("amount_1", ClickHouseDataType.Float32.name());

columnToDataTypesMap.put("amount", ClickHouseDataType.Float64.name());

columnToDataTypesMap.put("employed", ClickHouseDataType.Bool.name());

columnToDataTypesMap.put("blob_storage", ClickHouseDataType.String.name());

columnToDataTypesMap.put("blob_storage_scale", ClickHouseDataType.Decimal.name());

columnToDataTypesMap.put("json_output", ClickHouseDataType.JSON.name());

columnToDataTypesMap.put("max_amount", ClickHouseDataType.Float64.name());
Expand Down Expand Up @@ -132,7 +121,7 @@ public void testCreateTableSyntax() {

String query = act.createTableSyntax(primaryKeys, "auto_create_table", createFields(), this.columnToDataTypesMap);
System.out.println("QUERY" + query);
Assert.assertTrue(query.equalsIgnoreCase("CREATE TABLE auto_create_table(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY(customerName)"));
Assert.assertTrue(query.equalsIgnoreCase("CREATE TABLE `auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY(customerName)"));
//Assert.assertTrue(query.equalsIgnoreCase("CREATE TABLE auto_create_table(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY (customerName)"));
}

Expand All @@ -143,7 +132,7 @@ public void testCreateTableEmptyPrimaryKey() {

String query = act.createTableSyntax(null, "auto_create_table", createFields(), this.columnToDataTypesMap);

String expectedQuery = "CREATE TABLE auto_create_table(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()";
String expectedQuery = "CREATE TABLE `auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()";
Assert.assertTrue(query.equalsIgnoreCase(expectedQuery));
}
@Test
Expand All @@ -156,7 +145,7 @@ public void testCreateTableMultiplePrimaryKeys() {

String query = act.createTableSyntax(primaryKeys, "auto_create_table", createFields(), this.columnToDataTypesMap);

String expectedQuery = "CREATE TABLE auto_create_table(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()";
String expectedQuery = "CREATE TABLE `auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()";
Assert.assertTrue(query.equalsIgnoreCase(expectedQuery));
System.out.println(query);
}
Expand Down
Loading