Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename is_deleted column if source tables have column with the same name #434

Merged
merged 4 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions sink-connector-lightweight/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
</includes>
<parallel>all</parallel>
<threadCount>10</threadCount>
<properties>
<property>
<name>listener</name>
<value>com.altinity.clickhouse.debezium.embedded.FailFastListener</value>
</property>
</properties>
<useUnlimitedThreads>true</useUnlimitedThreads>
<perCoreThreadCount>true</perCoreThreadCount>
<useSystemClassLoader>true</useSystemClassLoader>
Expand Down Expand Up @@ -97,7 +103,7 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>2.5.0.Alpha1</version>
<version>2.5.0.Beta1</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -299,7 +305,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<maven.compiler.target>17</maven.compiler.target>
<version.debezium>2.5.0.Alpha1</version.debezium>
<version.debezium>2.5.0.Beta1</version.debezium>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
</properties>
</project>
12 changes: 0 additions & 12 deletions sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -425,18 +425,6 @@
<version>${sink-connector-library-version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.altinity</groupId>
<artifactId>clickhouse-kafka-sink-connector</artifactId>
<version>0.0.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.altinity</groupId>
<artifactId>clickhouse-kafka-sink-connector</artifactId>
<version>0.0.6</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<extensions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import org.slf4j.LoggerFactory;

import java.time.ZoneId;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

/**
* This class contains the only overridden functions from the generated parser.
Expand Down Expand Up @@ -92,19 +95,23 @@ public void enterCopyCreateTable(MySqlParser.CopyCreateTableContext copyCreateTa
public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCreateTableContext) {
StringBuilder orderByColumns = new StringBuilder();
StringBuilder partitionByColumn = new StringBuilder();
parseCreateTable(columnCreateTableContext, orderByColumns, partitionByColumn);
Set<String> columnNames = parseCreateTable(columnCreateTableContext, orderByColumns, partitionByColumn);
//this.query.append(" Engine=")
String isDeletedColumn = IS_DELETED_COLUMN;
if(columnNames.contains(isDeletedColumn)) {
isDeletedColumn = "__" + IS_DELETED_COLUMN;
}
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
this.query.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE).append(",");
this.query.append("`").append(IS_DELETED_COLUMN).append("` ").append(IS_DELETED_COLUMN_DATA_TYPE);
this.query.append("`").append(isDeletedColumn).append("` ").append(IS_DELETED_COLUMN_DATA_TYPE);
} else {
this.query.append("`").append(SIGN_COLUMN).append("` ").append(SIGN_COLUMN_DATA_TYPE).append(",");
this.query.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE);
}

this.query.append(")");
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(IS_DELETED_COLUMN).append(")");
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")");

Expand All @@ -119,10 +126,10 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr

}

private void parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder orderByColumns,
private Set<String> parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder orderByColumns,
StringBuilder partitionByColumns) {
List<ParseTree> pt = ctx.children;

Set<String> columnNames = new HashSet<>();

this.query.append(Constants.CREATE_TABLE).append(" ");
for (ParseTree tree : pt) {
Expand All @@ -136,66 +143,7 @@ private void parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder
if (subtree instanceof TerminalNodeImpl) {
// this.query.append(subtree.getText());
} else if (subtree instanceof MySqlParser.ColumnDeclarationContext) {
String columnName = null;
String colDataType = null;
boolean isNullColumn = true;
boolean isGeneratedColumn = false;
String generatedColumn = "";

for (ParseTree colDefTree : ((MySqlParser.ColumnDeclarationContext) subtree).children) {
if (colDefTree instanceof MySqlParser.FullColumnNameContext) {
columnName = colDefTree.getText();
this.query.append(columnName).append(" ");
} else if (colDefTree instanceof MySqlParser.ColumnDefinitionContext) {
String colDataTypeDefinition = colDefTree.getText();

colDataType = getClickHouseDataType(colDataTypeDefinition, colDefTree, columnName);
// Null Column and DimensionDataType are children of ColumnDefinition
for(ParseTree colDefinitionChildTree: ((MySqlParser.ColumnDefinitionContext) colDefTree).children) {
if (colDefinitionChildTree instanceof MySqlParser.NullColumnConstraintContext) {
if (colDefinitionChildTree.getText().equalsIgnoreCase(Constants.NOT_NULL))
isNullColumn = false;
} else if(colDefinitionChildTree instanceof MySqlParser.DimensionDataTypeContext) {
if (colDefinitionChildTree.getText() != null) {

}
} else if (colDefinitionChildTree instanceof MySqlParser.PrimaryKeyColumnConstraintContext) {
for(ParseTree primaryKeyTree: ((MySqlParser.PrimaryKeyColumnConstraintContext) colDefinitionChildTree).children) {
System.out.println(primaryKeyTree.getText());
orderByColumns.append(columnName);
break;
}
} else if (colDefinitionChildTree instanceof MySqlParser.GeneratedColumnConstraintContext) {
for(ParseTree generatedColumnTree: ((MySqlParser.GeneratedColumnConstraintContext) colDefinitionChildTree).children) {
if(generatedColumnTree instanceof MySqlParser.ExpressionContext) {
isGeneratedColumn = true;
generatedColumn = generatedColumnTree.getText();
//this.query.append(Constants.AS).append(" ").append(expression);
}
}

}
}
if(isGeneratedColumn) {
if(isNullColumn){
this.query.append(Constants.NULLABLE).append("(").append(colDataType)
.append(")");
} else
this.query.append(colDataType);

this.query.append(" ").append(Constants.ALIAS).append(" ").append(generatedColumn).append(",");
continue;
}

if(isNullColumn) {
this.query.append(Constants.NULLABLE).append("(").append(colDataType)
.append(")").append(",");
}
else {
this.query.append(colDataType).append(" ").append(Constants.NOT_NULLABLE).append(" ").append(",");
}
}
}
parseColumnDefinitions(subtree, orderByColumns, columnNames);
} else if(subtree instanceof MySqlParser.ConstraintDeclarationContext) {
for(ParseTree constraintTree: ((MySqlParser.ConstraintDeclarationContext) subtree).children) {
if(constraintTree instanceof MySqlParser.PrimaryKeyTableConstraintContext) {
Expand Down Expand Up @@ -234,6 +182,79 @@ private void parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder
}
}
}

return columnNames;
}

/**
* Function to parse column definitions.
* @param subtree
* @param orderByColumns
*
* @return list of column names
*/
private void parseColumnDefinitions(ParseTree subtree, StringBuilder orderByColumns, Set<String> columnNames) {
String columnName = null;
String colDataType = null;
boolean isNullColumn = true;
boolean isGeneratedColumn = false;
String generatedColumn = "";

for (ParseTree colDefTree : ((MySqlParser.ColumnDeclarationContext) subtree).children) {
if (colDefTree instanceof MySqlParser.FullColumnNameContext) {
columnName = colDefTree.getText();
this.query.append(columnName).append(" ");
} else if (colDefTree instanceof MySqlParser.ColumnDefinitionContext) {
String colDataTypeDefinition = colDefTree.getText();

colDataType = getClickHouseDataType(colDataTypeDefinition, colDefTree, columnName);
// Null Column and DimensionDataType are children of ColumnDefinition
for(ParseTree colDefinitionChildTree: ((MySqlParser.ColumnDefinitionContext) colDefTree).children) {
if (colDefinitionChildTree instanceof MySqlParser.NullColumnConstraintContext) {
if (colDefinitionChildTree.getText().equalsIgnoreCase(Constants.NOT_NULL))
isNullColumn = false;
} else if(colDefinitionChildTree instanceof MySqlParser.DimensionDataTypeContext) {
if (colDefinitionChildTree.getText() != null) {

}
} else if (colDefinitionChildTree instanceof MySqlParser.PrimaryKeyColumnConstraintContext) {
for(ParseTree primaryKeyTree: ((MySqlParser.PrimaryKeyColumnConstraintContext) colDefinitionChildTree).children) {
System.out.println(primaryKeyTree.getText());
orderByColumns.append(columnName);
break;
}
} else if (colDefinitionChildTree instanceof MySqlParser.GeneratedColumnConstraintContext) {
for(ParseTree generatedColumnTree: ((MySqlParser.GeneratedColumnConstraintContext) colDefinitionChildTree).children) {
if(generatedColumnTree instanceof MySqlParser.ExpressionContext) {
isGeneratedColumn = true;
generatedColumn = generatedColumnTree.getText();
//this.query.append(Constants.AS).append(" ").append(expression);
}
}

}
}
if(isGeneratedColumn) {
if(isNullColumn){
this.query.append(Constants.NULLABLE).append("(").append(colDataType)
.append(")");
} else
this.query.append(colDataType);

this.query.append(" ").append(Constants.ALIAS).append(" ").append(generatedColumn).append(",");
continue;
}

if(isNullColumn) {
this.query.append(Constants.NULLABLE).append("(").append(colDataType)
.append(")").append(",");
}
else {
this.query.append(colDataType).append(" ").append(Constants.NOT_NULLABLE).append(" ").append(",");
}
columnNames.add(columnName);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.altinity.clickhouse.debezium.embedded.ddl.parser;

import com.altinity.clickhouse.debezium.embedded.ITCommon;
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.sql.Connection;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

@Testcontainers
@DisplayName("Integration Test that validates DDL Creation when there are source columns with the same name(is_deleted)")
public class IsDeletedColumnsIT {

protected MySQLContainer mySqlContainer;
static ClickHouseContainer clickHouseContainer;

@BeforeEach
public void startContainers() throws InterruptedException {
mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
.asCompatibleSubstituteFor("mysql"))
.withDatabaseName("employees").withUsername("root").withPassword("adminpass")
.withInitScript("data_types.sql")
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));

BasicConfigurator.configure();
mySqlContainer.start();
// clickHouseContainer.start();
Thread.sleep(15000);
}

static {
clickHouseContainer = new org.testcontainers.clickhouse.ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
.withUsername("ch_user")
.withPassword("password")
.withExposedPorts(8123);

clickHouseContainer.start();
}


@ParameterizedTest
@CsvSource({
"clickhouse/clickhouse-server:latest",
"clickhouse/clickhouse-server:22.3"
})
@DisplayName("Test that validates create table in CH when MySQL has is_deleted columns")
public void testIsDeleted(String clickHouseServerVersion) throws Exception {

AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())),false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});


Thread.sleep(10000);
Connection conn = ITCommon.connectToMySQL(mySqlContainer);
conn.prepareStatement("create table new_table(col1 varchar(255), col2 int, is_deleted int, _sign int)").execute();

Thread.sleep(10000);

conn.prepareStatement("insert into new_table values('test', 1, 22, 1)").execute();
conn.close();
Thread.sleep(10000);

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
ResultSet rs = writer.executeQueryWithResultSet("select * from new_table");
boolean recordFound = false;
while(rs.next()) {
recordFound = true;
Assert.assertTrue(rs.getString("col1").equalsIgnoreCase("test"));
Assert.assertTrue(rs.getInt("col2") == 1);
Assert.assertTrue(rs.getInt("is_deleted") == 22);
Assert.assertTrue(rs.getInt("_sign") == 1);
}
Assert.assertTrue(recordFound);

if(engine.get() != null) {
engine.get().stop();
}
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,16 @@ public void testGeneratedColumn() {
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("CREATE TABLE employees.contacts(fullname Nullable(String) ALIAS CONCAT(first_name,' ',last_name),email String NOT NULL ,`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY tuple()"));
}

@Test
public void testSourceWithIsDeletedColumn() {
StringBuffer clickHouseQuery = new StringBuffer();

String sql = "create table new_table(col1 varchar(255), col2 int, is_deleted int, _sign int);";
mySQLDDLParserService.parseSql(sql, "", clickHouseQuery);

Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("CREATE TABLE new_table(col1 Nullable(String),col2 Nullable(Int32),is_deleted Nullable(Int32),_sign Nullable(Int32),`_version` UInt64,`__is_deleted` UInt8) Engine=ReplacingMergeTree(_version,__is_deleted) ORDER BY tuple()"));
}

@ParameterizedTest
@CsvSource({
"ALTER TABLE test_table rename to test_table_new, false",
Expand Down
Loading
Loading