Skip to content

Commit

Permalink
Merge pull request #433 from Altinity/429-support-for-calculated-columns
Browse files Browse the repository at this point in the history
429 support for calculated columns(mysql generate)
  • Loading branch information
subkanthi authored Jan 11, 2024
2 parents fafd144 + b82320a commit 90325d8
Show file tree
Hide file tree
Showing 15 changed files with 282 additions and 249 deletions.
12 changes: 12 additions & 0 deletions sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,18 @@
<version>${sink-connector-library-version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.altinity</groupId>
<artifactId>clickhouse-kafka-sink-connector</artifactId>
<version>0.0.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.altinity</groupId>
<artifactId>clickhouse-kafka-sink-connector</artifactId>
<version>0.0.6</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<extensions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

public class Constants {

public static final String ALIAS = "ALIAS";
public static final String PARTITION_BY = " PARTITION BY ";
public static final String ORDER_BY = " ORDER BY ";
public static final String ORDER_BY_TUPLE = " ORDER BY tuple()";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ private void parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder
String columnName = null;
String colDataType = null;
boolean isNullColumn = true;
boolean isGeneratedColumn = false;
String generatedColumn = "";

for (ParseTree colDefTree : ((MySqlParser.ColumnDeclarationContext) subtree).children) {
if (colDefTree instanceof MySqlParser.FullColumnNameContext) {
columnName = colDefTree.getText();
Expand All @@ -162,12 +165,33 @@ private void parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder
orderByColumns.append(columnName);
break;
}
} else if (colDefinitionChildTree instanceof MySqlParser.GeneratedColumnConstraintContext) {
for(ParseTree generatedColumnTree: ((MySqlParser.GeneratedColumnConstraintContext) colDefinitionChildTree).children) {
if(generatedColumnTree instanceof MySqlParser.ExpressionContext) {
isGeneratedColumn = true;
generatedColumn = generatedColumnTree.getText();
//this.query.append(Constants.AS).append(" ").append(expression);
}
}

}
}
if(isGeneratedColumn) {
if(isNullColumn){
this.query.append(Constants.NULLABLE).append("(").append(colDataType)
.append(")");
} else
this.query.append(colDataType);

this.query.append(" ").append(Constants.ALIAS).append(" ").append(generatedColumn).append(",");
continue;
}

if(isNullColumn) {
this.query.append(Constants.NULLABLE).append("(").append(colDataType)
.append(")").append(",");
} else {
}
else {
this.query.append(colDataType).append(" ").append(Constants.NOT_NULLABLE).append(" ").append(",");
}
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.altinity.clickhouse.debezium.embedded;

import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;

public class ITCommon {
static public Connection connectToMySQL(MySQLContainer mySqlContainer) {
Connection conn = null;
try {

String connectionUrl = String.format("jdbc:mysql://%s:%s/%s?user=%s&password=%s", mySqlContainer.getHost(), mySqlContainer.getFirstMappedPort(),
mySqlContainer.getDatabaseName(), mySqlContainer.getUsername(), mySqlContainer.getPassword());
conn = DriverManager.getConnection(connectionUrl);


} catch (SQLException ex) {
// handle any errors

}

return conn;
}

static public Properties getDebeziumProperties(MySQLContainer mySqlContainer, ClickHouseContainer clickHouseContainer) throws Exception {

// Start the debezium embedded application.

Properties defaultProps = new Properties();
Properties defaultProperties = PropertiesHelper.getProperties("config.properties");

defaultProps.putAll(defaultProperties);
Properties fileProps = new ConfigLoader().load("config.yml");
defaultProps.putAll(fileProps);

defaultProps.setProperty("database.hostname", mySqlContainer.getHost());
defaultProps.setProperty("database.port", String.valueOf(mySqlContainer.getFirstMappedPort()));
defaultProps.setProperty("database.user", "root");
defaultProps.setProperty("database.password", "adminpass");

defaultProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost());
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername());
defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword());
defaultProps.setProperty("clickhouse.server.database", "employees");

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));


return defaultProps;

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package com.altinity.clickhouse.debezium.embedded;

import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLBaseIT;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.sql.Connection;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import static com.altinity.clickhouse.debezium.embedded.ITCommon.connectToMySQL;
import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties;

@Testcontainers
@DisplayName("Integration Test that validates replication of Create DDL with Generated columns")
public class MySQLGenerateColumnsTest {

protected MySQLContainer mySqlContainer;

@Container
public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
.withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml")
.withUsername("ch_user")
.withPassword("password")
.withExposedPorts(8123);


@BeforeEach
public void startContainers() throws InterruptedException {
mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
.asCompatibleSubstituteFor("mysql"))
.withDatabaseName("employees").withUsername("root").withPassword("adminpass")
// .withInitScript("data_types.sql")
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));

BasicConfigurator.configure();
mySqlContainer.start();
clickHouseContainer.start();
Thread.sleep(25000);
}

@Test
public void testMySQLGeneratedColumns() throws Exception {
AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {

Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

Thread.sleep(30000);

Connection conn = connectToMySQL(mySqlContainer);

conn.prepareStatement("\n" +
"CREATE TABLE employees.contacts (id INT AUTO_INCREMENT PRIMARY KEY NOT NULL,\n" +
"first_name VARCHAR(50) NOT NULL,\n" +
"last_name VARCHAR(50) NOT NULL,\n" +
"fullname varchar(101) GENERATED ALWAYS AS (CONCAT(first_name,' ',last_name)),\n" +
"email VARCHAR(100) NOT NULL);\n").execute();

Thread.sleep(30000);

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
Map<String, String> columnsToDataTypeMap = writer.getColumnsDataTypesForTable("contacts");

Assert.assertTrue(columnsToDataTypeMap.get("id").equalsIgnoreCase("Int32"));
Assert.assertTrue(columnsToDataTypeMap.get("first_name").equalsIgnoreCase("String"));
Assert.assertTrue(columnsToDataTypeMap.get("last_name").equalsIgnoreCase("String"));
Assert.assertTrue(columnsToDataTypeMap.get("fullname").equalsIgnoreCase("Nullable(String)"));
Assert.assertTrue(columnsToDataTypeMap.get("email").equalsIgnoreCase("String"));

writer.getConnection().close();
Thread.sleep(10000);


}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.altinity.clickhouse.debezium.embedded.ddl.parser;

import com.altinity.clickhouse.debezium.embedded.ITCommon;
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader;
Expand Down Expand Up @@ -65,12 +66,12 @@ public void testCreateTable() throws Exception {
executorService.execute(() -> {
try {

Properties props = getDebeziumProperties();
Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
props.setProperty("database.include.list", "datatypes");
props.setProperty("clickhouse.server.database", "datatypes");

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())), false);
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -288,42 +289,4 @@ public void testCreateTable() throws Exception {
writer.getConnection().close();
}

protected Properties getDebeziumProperties() throws Exception {

// Start the debezium embedded application.

Properties defaultProps = new Properties();
Properties defaultProperties = PropertiesHelper.getProperties("config.properties");

defaultProps.putAll(defaultProperties);
Properties fileProps = new ConfigLoader().load("config.yml");
defaultProps.putAll(fileProps);

defaultProps.setProperty("database.hostname", mySqlContainer.getHost());
defaultProps.setProperty("database.port", String.valueOf(mySqlContainer.getFirstMappedPort()));
defaultProps.setProperty("database.user", "root");
defaultProps.setProperty("database.password", "adminpass");

defaultProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost());
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername());
defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword());
defaultProps.setProperty("clickhouse.server.database", "employees");

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));


return defaultProps;

}
}
Loading

0 comments on commit 90325d8

Please sign in to comment.