Skip to content

Commit

Permalink
Merge branch 'refs/heads/2.2.0' into fix_runners_24_4
Browse files Browse the repository at this point in the history
  • Loading branch information
Selfeer committed May 31, 2024
2 parents 955c05b + 53ac6bc commit 4bb30e7
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 14 deletions.
3 changes: 2 additions & 1 deletion sink-connector-lightweight/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@
</includes>
<properties>
<property>
<surefire.test.runOrder>filesystem</surefire.test.runOrder>
<name>listener</name>
<value>com.altinity.clickhouse.debezium.embedded.FailFastListener</value>
</property>
</properties>
<useUnlimitedThreads>true</useUnlimitedThreads>
<perCoreThreadCount>true</perCoreThreadCount>
<useSystemClassLoader>true</useSystemClassLoader>
<runOrder>${surefire.test.runOrder}</runOrder>
</configuration>
</plugin>
<plugin>
Expand Down Expand Up @@ -297,7 +299,6 @@
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<version.testcontainers>1.19.1</version.testcontainers>
<surefire-plugin.version>3.0.0-M7</surefire-plugin.version>
<apache.httpclient.version>5.2.1</apache.httpclient.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<sink-connector-library-version>0.0.8</sink-connector-library-version>
<version.junit>5.9.1</version.junit>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ public void run() {
start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, true);
} catch (IOException e) {
log.error("**** ERROR: Restarting Event Loop ****", e);
throw new RuntimeException(e);
} catch (Exception e) {
log.error("**** ERROR: Restarting Event Loop ****", e);
throw new RuntimeException(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,11 +581,13 @@ public void handle(boolean b, String s, Throwable throwable) {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
log.error("Error sleeping", e);
throw new RuntimeException(e);
}
try {
setupDebeziumEventCapture(props, debeziumRecordParserService, config);
} catch (IOException | ClassNotFoundException e) {
log.error("Error setting up debezium event capture", e);
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public static ClickHouseDataType convert(String columnName, MySqlParser.DataType
JdbcValueConverters.DecimalMode.PRECISE,
TemporalPrecisionMode.ADAPTIVE,
JdbcValueConverters.BigIntUnsignedMode.LONG,
CommonConnectorConfig.BinaryHandlingMode.BYTES
);
CommonConnectorConfig.BinaryHandlingMode.BYTES);


DataType dataType = initializeDataTypeResolver().resolveDataType(columnDefChild);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ private ClickHouseStruct readBeforeOrAfterSection(Map<String, Object> convertedV
}
}
} catch (ParseException e) {
log.error("Error parsing JSON", e);
throw new RuntimeException(e);
}

Expand Down
4 changes: 4 additions & 0 deletions sink-connector-lightweight/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
</Console>
</Appenders>
<Loggers>
<Logger name="com.clickhouse" level="ERROR"
additivity="false">
<AppenderRef ref="console"/>
</Logger>
<Root level="debug" additivity="false">
<AppenderRef ref="console" />
</Root>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ public DbWriter(
}
boolean useReplicatedReplacingMergeTree = this.config.getBoolean(
ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString());
String rmtDeleteColumn = this.config.getString(ClickHouseSinkConnectorConfigVariables.REPLACING_MERGE_TREE_DELETE_COLUMN.toString());
act.createNewTable(record.getPrimaryKey(), tableName, database, fields, this.conn,
isNewReplacingMergeTreeEngine, useReplicatedReplacingMergeTree);
isNewReplacingMergeTreeEngine, useReplicatedReplacingMergeTree, rmtDeleteColumn);
} catch (Exception e) {
log.error("**** Error creating table ***" + tableName, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,13 @@ private boolean executePreparedStatement(String insertQuery, String topicName,
try {
ps = conn.prepareStatement("TRUNCATE TABLE " + databaseName + "." + tableName);
} catch (SQLException e) {
log.error("*** Error: Truncate table statement error ****", e);
throw new RuntimeException(e);
}
try {
ps.execute();
} catch (SQLException e) {
log.error("*** Error: Truncate table statement execute error ****", e);
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.altinity.clickhouse.sink.connector.db.operations;

import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.clickhouse.data.ClickHouseDataType;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -26,10 +27,10 @@ public class ClickHouseAutoCreateTable extends ClickHouseTableOperationsBase{

public void createNewTable(ArrayList<String> primaryKey, String tableName, String databaseName, Field[] fields,
ClickHouseConnection connection, boolean isNewReplacingMergeTree,
boolean useReplicatedReplacingMergeTree) throws SQLException {
boolean useReplicatedReplacingMergeTree, String rmtDeleteColumn) throws SQLException {
Map<String, String> colNameToDataTypeMap = this.getColumnNameToCHDataTypeMapping(fields);
String createTableQuery = this.createTableSyntax(primaryKey, tableName, databaseName, fields, colNameToDataTypeMap,
isNewReplacingMergeTree, useReplicatedReplacingMergeTree);
isNewReplacingMergeTree, useReplicatedReplacingMergeTree, rmtDeleteColumn);
log.info("**** AUTO CREATE TABLE " + createTableQuery);
// ToDO: need to run it before a session is created.
this.runQuery(createTableQuery, connection);
Expand All @@ -45,23 +46,24 @@ public void createNewTable(ArrayList<String> primaryKey, String tableName, Strin
public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String tableName, String databaseName, Field[] fields,
Map<String, String> columnToDataTypesMap,
boolean isNewReplacingMergeTreeEngine,
boolean useReplicatedReplacingMergeTree) {
boolean useReplicatedReplacingMergeTree,
String rmtDeleteColumn) {

StringBuilder createTableSyntax = new StringBuilder();

createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`").append("(");

for(Field f: fields) {
for (Field f : fields) {
String colName = f.name();
String dataType = columnToDataTypesMap.get(colName);
boolean isNull = false;
if(f.schema().isOptional() == true) {
if (f.schema().isOptional() == true) {
isNull = true;
}
createTableSyntax.append("`").append(colName).append("`").append(" ").append(dataType);

// Ignore setting NULL OR not NULL for JSON and Array
if(dataType != null &&
if (dataType != null &&
(dataType.equalsIgnoreCase(ClickHouseDataType.JSON.name()) ||
dataType.contains(ClickHouseDataType.Array.name()))) {
// ignore adding nulls;
Expand All @@ -78,6 +80,10 @@ public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String t

String isDeletedColumn = IS_DELETED_COLUMN;

if(rmtDeleteColumn != null && !rmtDeleteColumn.isEmpty()) {
isDeletedColumn = rmtDeleteColumn;
}

if(isNewReplacingMergeTreeEngine == true) {
createTableSyntax.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE).append(",");
createTableSyntax.append("`").append(isDeletedColumn).append("` ").append(IS_DELETED_COLUMN_DATA_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
log.error("******* ERROR **** Thread interrupted *********", ex);
throw new RuntimeException(ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ static synchronized public boolean checkIfBatchCanBeCommitted(List<ClickHouseStr
try {
acknowledgeRecords(v);
} catch (InterruptedException e) {
log.error("*** Error acknowlegeRecords ***", e);
throw new RuntimeException(e);
}
completedBatches.remove(k);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void testCreateTableSyntax() {
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();

String query = act.createTableSyntax(primaryKeys, "auto_create_table", "employees",
createFields(), this.columnToDataTypesMap, false, false);
createFields(), this.columnToDataTypesMap, false, false, null);
System.out.println("QUERY" + query);
Assert.assertTrue(query.equalsIgnoreCase("CREATE TABLE employees.`auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY(customerName)"));
//Assert.assertTrue(query.equalsIgnoreCase("CREATE TABLE auto_create_table(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY (customerName)"));
Expand All @@ -137,7 +137,7 @@ public void testCreateTableEmptyPrimaryKey() {
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();

String query = act.createTableSyntax(null, "auto_create_table", "employees", createFields(),
this.columnToDataTypesMap, false, false);
this.columnToDataTypesMap, false, false, null);

String expectedQuery = "CREATE TABLE employees.`auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()";
Assert.assertTrue(query.equalsIgnoreCase(expectedQuery));
Expand All @@ -151,7 +151,7 @@ public void testCreateTableMultiplePrimaryKeys() {
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();

String query = act.createTableSyntax(primaryKeys, "auto_create_table", "customers", createFields(),
this.columnToDataTypesMap, false, false);
this.columnToDataTypesMap, false, false, null);

String expectedQuery = "CREATE TABLE customers.`auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()";
Assert.assertTrue(query.equalsIgnoreCase(expectedQuery));
Expand Down Expand Up @@ -181,7 +181,7 @@ public void testCreateNewTable() {

try {
act.createNewTable(primaryKeys, "auto_create_table", "default", this.createFields(), writer.getConnection(),
false, false);
false, false, null);
} catch(SQLException se) {
Assert.assertTrue(false);
}
Expand Down

0 comments on commit 4bb30e7

Please sign in to comment.