Skip to content

Commit

Permalink
Merge pull request #792 from Altinity/791-upgrade-debezium-version-to…
Browse files Browse the repository at this point in the history
…-271final

Upgrade debezium to 2.7.1.Final
  • Loading branch information
subkanthi authored Oct 8, 2024
2 parents a4abaa9 + 980af39 commit 71dae9b
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 29 deletions.
2 changes: 1 addition & 1 deletion sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<maven.compiler.target>17</maven.compiler.target>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.debezium>2.7.0.Beta2</version.debezium>
<version.debezium>2.7.2.Final</version.debezium>
<version.junit>5.9.1</version.junit>
<version.testcontainers>1.19.1</version.testcontainers>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ else if(parsedDataType.contains("(") && parsedDataType.contains(")") &&
}
}

chDataType = DataTypeConverter.convertToString(columnName,
chDataType = DataTypeConverter.convertToString(this.config, columnName,
scale, precision, dtc, this.userProvidedTimeZone);

return chDataType;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,63 +1,71 @@
package com.altinity.clickhouse.debezium.embedded.parser;

import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.converters.ClickHouseDataTypeMapper;
import com.clickhouse.data.ClickHouseDataType;
import io.debezium.antlr.DataTypeResolver;
import io.debezium.bean.DefaultBeanRegistry;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.charset.MySqlCharsetRegistryServiceProvider;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.ddl.DataType;
import io.debezium.service.DefaultServiceRegistry;
import io.debezium.service.spi.ServiceRegistry;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.sql.Types;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Map;

/**
*
*/
public class DataTypeConverter {

/**
* Function that takes the Antlr parsed column type
* and converts it into ClickHouse type.
*
* @param columnName
* @param columnDefChild
* @return
*/
public static ClickHouseDataType convert(String columnName, MySqlParser.DataTypeContext columnDefChild) {
MySqlValueConverters mysqlConverter = new MySqlValueConverters(
JdbcValueConverters.DecimalMode.PRECISE,
TemporalPrecisionMode.ADAPTIVE,
JdbcValueConverters.BigIntUnsignedMode.LONG,
CommonConnectorConfig.BinaryHandlingMode.BYTES,
x ->x, CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN);


DataType dataType = initializeDataTypeResolver().resolveDataType(columnDefChild);
//DataType dataType = MySqlParser.dataTypeResolver.resolveDataType(dataTypeContext);
Column column = Column.editor().name(columnName).type(dataType.name()).jdbcType(dataType.jdbcType()).length((int) dataType.length()).scale(dataType.scale()).create();
SchemaBuilder schemaBuilder = mysqlConverter.schemaBuilder(column);

return ClickHouseDataTypeMapper.getClickHouseDataType(schemaBuilder.schema().type(), schemaBuilder.schema().name());
}

public static DataType getDataType(MySqlParser.DataTypeContext columnDefChild) {
String convertedDataType = null;
return initializeDataTypeResolver().resolveDataType(columnDefChild);
}

public static String convertToString(String columnName, int scale, int precision, MySqlParser.DataTypeContext columnDefChild, ZoneId userProvidedTimeZone) {
public static String convertToString(ClickHouseSinkConnectorConfig config, String columnName, int scale, int precision, MySqlParser.DataTypeContext columnDefChild, ZoneId userProvidedTimeZone) {
new DefaultBeanRegistry();

// Convert ClickHouseConnectorConfig to configuration.
Configuration configuration = Configuration.create()
.with(BinlogConnectorConfig.DECIMAL_HANDLING_MODE, "decimalHandlingMode")
.with(BinlogConnectorConfig.TIME_PRECISION_MODE, "temporalPrecisionMode")
.with(BinlogConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE, "bigIntUnsignedHandlingMode")
.with(BinlogConnectorConfig.BINARY_HANDLING_MODE, "binaryHandlingMode")
.with(BinlogConnectorConfig.EVENT_CONVERTING_FAILURE_HANDLING_MODE, "eventConvertingFailureHandlingMode")
.build();

final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(configuration);

// Convert Properties to Configuration.
// Configuration configuration = Configuration.create().build();
// // Iterate through properties and fill configuration.
// for (Map.Entry<Object, Object> entry : properties.entrySet()) {
// configuration = configuration.edit().with(entry.getKey().toString(), entry.getValue().toString()).build();
// }
ServiceRegistry serviceRegistry = new DefaultServiceRegistry( Configuration.create().build(), new DefaultBeanRegistry());
BinlogCharsetRegistry charsetRegistry = new MySqlCharsetRegistryServiceProvider().createService(Configuration.create().build(), serviceRegistry);
MySqlValueConverters mysqlConverter = new MySqlValueConverters(
JdbcValueConverters.DecimalMode.PRECISE,
TemporalPrecisionMode.ADAPTIVE,
JdbcValueConverters.BigIntUnsignedMode.LONG,
CommonConnectorConfig.BinaryHandlingMode.BYTES,
x ->x, CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN);
x ->x, CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN, connectorConfig.getServiceRegistry());


String convertedDataType = null;
Expand Down
2 changes: 1 addition & 1 deletion sink-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>2.7.0.Beta2</version>
<version>2.7.2.Final</version>
</dependency>

<dependency>
Expand Down

0 comments on commit 71dae9b

Please sign in to comment.