Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

861 renaming a table doesnt work for databases that are mapped to another name in clickhouse #862

Open
wants to merge 17 commits into
base: 2.4.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
a5ec2af
Added test to cover RENAME table with database.override.map
subkanthi Oct 9, 2024
d4875b0
Upgrade debezium version to 3.0.0.Final
subkanthi Oct 13, 2024
f563469
Merge branch '2.4.0' of github.com:Altinity/clickhouse-sink-connector…
subkanthi Oct 14, 2024
281113c
Merge branch '2.5.0' of github.com:Altinity/clickhouse-sink-connector…
subkanthi Oct 14, 2024
e605355
Add ddl.retry to ddl tests
subkanthi Oct 14, 2024
92dc346
set ddl.retry to true for aall DDL tests
subkanthi Oct 14, 2024
58029c6
Merge pull request #798 from Altinity/2.4.0
subkanthi Oct 18, 2024
8f0b5da
Merge tag '2.4.0' of github.com:Altinity/clickhouse-sink-connector in…
subkanthi Oct 19, 2024
4864826
Merge branch 'develop' of github.com:Altinity/clickhouse-sink-connect…
subkanthi Oct 19, 2024
b29e5c0
Merge tag '2.4.0' of github.com:Altinity/clickhouse-sink-connector in…
subkanthi Oct 19, 2024
b6bad48
Merge branch 'develop' of github.com:Altinity/clickhouse-sink-connect…
subkanthi Oct 19, 2024
bbe062d
Merge branch '2.5.0' of github.com:Altinity/clickhouse-sink-connector…
subkanthi Oct 19, 2024
f1add2d
Add support for parameterized test to test db override functionality
subkanthi Oct 19, 2024
699ab46
Updated setup signature removed ddlservice dependency
subkanthi Oct 20, 2024
a490017
Fixed rename table for database.override.map
subkanthi Oct 20, 2024
a569354
Merge branch '2.4.1' of github.com:Altinity/clickhouse-sink-connector…
subkanthi Oct 20, 2024
2933464
Added assert for DatabaseOverrideIT
subkanthi Oct 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<maven.compiler.target>17</maven.compiler.target>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.debezium>2.5.0.Beta1</version.debezium>
<version.debezium>2.7.2.Final</version.debezium>
<version.junit>5.9.1</version.junit>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
4 changes: 2 additions & 2 deletions sink-connector-lightweight/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>2.7.0.Beta2</version>
<version>2.7.2.Final</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -326,7 +326,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<maven.compiler.target>17</maven.compiler.target>
<version.debezium>2.7.0.Beta2</version.debezium>
<version.debezium>2.7.2.Final</version.debezium>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
</properties>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ public static void main(String[] args) throws Exception {

setupMonitoringThread(new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props)), props);

embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);

try {
DebeziumEmbeddedRestApi.startRestApi(props, injector, debeziumChangeEventCapture, userProperties);
Expand Down Expand Up @@ -141,8 +140,7 @@ public static CompletableFuture<String> startDebeziumEventLoop(Injector injector

Thread.sleep(500);
// embeddedApplication = new ClickHouseDebeziumEmbeddedApplication();
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, true);
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, true);
return null;
});

Expand All @@ -151,15 +149,15 @@ public static CompletableFuture<String> startDebeziumEventLoop(Injector injector


public static void start(DebeziumRecordParserService recordParserService,
DDLParserService ddlParserService, Properties props, boolean forceStart) throws Exception {
Properties props, boolean forceStart) throws Exception {

if(forceStart == true) {
// Reload the configuration file.
log.info(String.format("******* Reloading configuration file (%s) from disk ******", configurationFile));
loadPropertiesFile(configurationFile);
}
debeziumChangeEventCapture = new DebeziumChangeEventCapture();
debeziumChangeEventCapture.setup(props, recordParserService, ddlParserService, forceStart);
debeziumChangeEventCapture.setup(props, recordParserService, forceStart);
}

public static void stop() throws IOException {
Expand Down Expand Up @@ -210,8 +208,7 @@ public void run() {
log.info("******* Restarting Event Loop ********");
debeziumChangeEventCapture.stop();
Thread.sleep(3000);
start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, true);
start(injector.getInstance(DebeziumRecordParserService.class), props, true);
} catch (IOException e) {
log.error("**** ERROR: Restarting Event Loop ****", e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public static void startRestApi(Properties props, Injector injector,
try {
debeziumChangeEventCapture.deleteSchemaHistory(config, finalProps1);
} catch (Exception e) {
log.error("Client - Error deleting offsets", e);
log.error("Client - Error deleting schema history", e);
ctx.result(e.toString());
ctx.status(HttpStatus.INTERNAL_SERVER_ERROR);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.storage.jdbc.history.JdbcSchemaHistoryConfig;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
Expand Down Expand Up @@ -164,10 +166,20 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,
updateMetrics(DDL, writer);
}

/**
* Function to get the database name from the SourceRecord.
* If the database name is not present in the SourceRecord, then
* the database name is set to "system".
* Also if a database is overridden in the configuration, then
* the database name is set to the overridden database name.
* @param sr
* @return
*/
private String getDatabaseName(SourceRecord sr) {
if (sr != null && sr.key() instanceof Struct) {
String recordDbName = (String) ((Struct) sr.key()).get("databaseName");
if (recordDbName != null && !recordDbName.isEmpty()) {

return recordDbName;
}
}
Expand Down Expand Up @@ -411,8 +423,8 @@ private Pair<String, String> getDebeziumOffsetStorageDatabaseName(Properties pro
* @return
*/
private Pair<String, String> getDebeziumSchemaHistoryDatabaseName(Properties props) {
String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
String tableName = props.getProperty(SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING +
JdbcSchemaHistoryConfig.PROP_TABLE_NAME.name());
return splitTableName(tableName);
}

Expand Down Expand Up @@ -605,10 +617,11 @@ public void deleteSchemaHistory(ClickHouseSinkConnectorConfig config, Properties
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);

// Get topic.prefix from config
String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name());
new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableNameDatabaseName.getRight() + "."
+ tableNameDatabaseName.getLeft(),writer);
// Get topic.prefix from properies
String topicPrefix = props.getProperty(CommonConnectorConfig.TOPIC_PREFIX.name());
// String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name());
// Jdbc adds the database name to the table name, so we need to remove it
new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableName, writer);

}
/**
Expand Down Expand Up @@ -770,7 +783,7 @@ public void connectorStopped() {
* @param debeziumRecordParserService
*/
public void setup(Properties props, DebeziumRecordParserService debeziumRecordParserService,
DDLParserService ddlParserService, boolean forceStart) throws IOException, ClassNotFoundException {
boolean forceStart) throws IOException, ClassNotFoundException {

// Check if max queue size was defined by the user.
if(props.getProperty(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;

import com.clickhouse.logging.Logger;
import com.clickhouse.logging.LoggerFactory;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
Expand Down Expand Up @@ -32,6 +34,7 @@ public class DebeziumOffsetStorage {
public static final String SOURCE_PASSWORD = "source_password";


private static final Logger log = LoggerFactory.getLogger(DebeziumOffsetStorage.class);

public String getOffsetKey(Properties props) {
String connectorName = props.getProperty("name");
Expand Down Expand Up @@ -62,7 +65,8 @@ public void deleteSchemaHistoryTable(String offsetKey,
BaseDbWriter writer) throws SQLException {


String debeziumStorageStatusQuery = String.format("delete from %s where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='\"%s\"" , tableName, offsetKey);
String debeziumStorageStatusQuery = String.format("delete from `%s` where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='%s'" , tableName, offsetKey);
log.info("Deleting schema history table query: " + debeziumStorageStatusQuery);
writer.executeQuery(debeziumStorageStatusQuery);
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,8 @@ public MySqlDDLParserListenerImpl(BaseDbWriter writer, StringBuffer transformedQ
} catch(Exception e) {
log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString());
}
// databaseName might contain backticks. Remove them.
if(databaseName.contains("`")) {
databaseName = databaseName.replace("`", "");
}

if(sourceToDestinationMap.containsKey(databaseName)) {
this.databaseName = sourceToDestinationMap.get(databaseName);
} else {
this.databaseName = databaseName;
}
this.databaseName = overrideDatabaseName(databaseName);

this.query = transformedQuery;
this.tableName = tableName;
Expand All @@ -74,6 +66,23 @@ public MySqlDDLParserListenerImpl(BaseDbWriter writer, StringBuffer transformedQ
this.userProvidedTimeZone = parseTimeZone();
}

/**
* Function to override the database name.
* @param databaseName
* @return
*/
private String overrideDatabaseName(String databaseName) {

// databaseName might contain backticks. Remove them.
if(databaseName.contains("`")) {
databaseName = databaseName.replace("`", "");
}

if(sourceToDestinationMap.containsKey(databaseName)) {
return sourceToDestinationMap.get(databaseName);
}
return databaseName;
}

public ZoneId parseTimeZone() {
String userProvidedTimeZone = config.getString(ClickHouseSinkConnectorConfigVariables
Expand Down Expand Up @@ -102,25 +111,9 @@ public void enterCreateDatabase(MySqlParser.CreateDatabaseContext createDatabase

String databaseName = tree.getText();
if(!databaseName.isEmpty()) {
// Check if the database is overridden
Map<String, String> sourceToDestinationMap = new HashMap<>();

try {
if (this.config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()) != null)
sourceToDestinationMap = Utils.parseSourceToDestinationDatabaseMap(this.config.
getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()));
} catch(Exception e) {
log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString());
}
// databaseName might contain backticks. Remove them.
if(databaseName.contains("`")) {
databaseName = databaseName.replace("`", "");
}
if(sourceToDestinationMap.containsKey(databaseName)) {
this.query.append(String.format(Constants.CREATE_DATABASE, sourceToDestinationMap.get(databaseName)));
} else {
this.query.append(String.format(Constants.CREATE_DATABASE, databaseName));
}

String overrideDatabaseName = overrideDatabaseName(tree.getText());
this.query.append(String.format(Constants.CREATE_DATABASE, overrideDatabaseName));
}
}
}
Expand Down Expand Up @@ -736,8 +729,12 @@ public void enterRenameTable(MySqlParser.RenameTableContext renameTableContext)
originalTableName = renameTableContextChildren.get(0).getText();
newTableName = renameTableContextChildren.get(2).getText();
// If the table name already includes the database name dont include it in the query.
if(originalTableName.contains(".")) {
this.query.append(originalTableName).append(" to ").append(newTableName);
if(originalTableName.contains(".") && newTableName.contains(".")) {
// Split database and table name.
String[] databaseAndTableNameArray = originalTableName.split("\\.");
String[] newDatabaseAndTableNameArray = newTableName.split("\\.");
this.query.append(this.databaseName).append(".").append(databaseAndTableNameArray[1]).append(" to ").
append(this.databaseName).append(".").append(newDatabaseAndTableNameArray[1]);
} else
this.query.append(databaseName).append(".").append(originalTableName).append(" to ").
append(databaseName).append(".").append(newTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
"employees"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public void testPgOutputPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "system"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ static public Properties getDebeziumProperties(MySQLContainer mySqlContainer, Cl
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername());
defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword());
//defaultProps.setProperty("ddl.retry", "true");

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false);
engine.get().setup(props, new SourceRecordParserService(),false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -136,7 +135,7 @@
// query clickhouse connection and get data for test_table1 and test_table2


ResultSet rs = writer.executeQueryWithResultSet("SELECT * FROM employees.audience");

Check failure on line 138 in sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

MariaDBIT.testMultipleDatabases

Code: 60. DB::Exception: Unknown table expression identifier 'employees.audience' in scope SELECT * FROM employees.audience. (UNKNOWN_TABLE) (version 24.9.2.42 (official build)) , server ClickHouseNode [uri=http://localhost:32851/system, options={custom_settings=allow_experimental_object_type=1,insert_allow_materialized_columns=1,client_name=Client_1}]@1044014396
Raw output
java.sql.BatchUpdateException: 
Code: 60. DB::Exception: Unknown table expression identifier 'employees.audience' in scope SELECT * FROM employees.audience. (UNKNOWN_TABLE) (version 24.9.2.42 (official build))
, server ClickHouseNode [uri=http://localhost:32851/system, options={custom_settings=allow_experimental_object_type=1,insert_allow_materialized_columns=1,client_name=Client_1}]@1044014396
	at com.altinity.clickhouse.debezium.embedded.MariaDBIT.testMultipleDatabases(MariaDBIT.java:138)
// Validate the data
boolean recordFound = false;
while(rs.next()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ public void testMySQLGeneratedColumns() throws Exception {
Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
"employees"), false);
engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false);
engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -129,7 +128,7 @@
// query clickhouse connection and get data for test_table1 and test_table2


ResultSet rs = writer.executeQueryWithResultSet("SELECT * FROM employees.audience");

Check failure on line 131 in sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

MySQLJsonIT.testMultipleDatabases

Code: 60. DB::Exception: Unknown table expression identifier 'employees.audience' in scope SELECT * FROM employees.audience. (UNKNOWN_TABLE) (version 24.9.2.42 (official build)) , server ClickHouseNode [uri=http://localhost:32849/system, options={custom_settings=allow_experimental_object_type=1,insert_allow_materialized_columns=1,client_name=Client_1}]@1437041406
Raw output
java.sql.BatchUpdateException: 
Code: 60. DB::Exception: Unknown table expression identifier 'employees.audience' in scope SELECT * FROM employees.audience. (UNKNOWN_TABLE) (version 24.9.2.42 (official build))
, server ClickHouseNode [uri=http://localhost:32849/system, options={custom_settings=allow_experimental_object_type=1,insert_allow_materialized_columns=1,client_name=Client_1}]@1437041406
	at com.altinity.clickhouse.debezium.embedded.MySQLJsonIT.testMultipleDatabases(MySQLJsonIT.java:131)
// Validate the data
boolean recordFound = false;
while(rs.next()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ public void testAutoCreateTable(String clickHouseServerVersion) throws Exception
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false);
engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -141,7 +140,7 @@

Assert.assertTrue(reDataColumns.get("amount").equalsIgnoreCase("Decimal(64, 18)"));
Assert.assertTrue(reDataColumns.get("total_amount").equalsIgnoreCase("Decimal(21, 5)"));
Assert.assertTrue(tmCount == 2);

Check failure on line 143 in sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

PostgresInitialDockerWKeeperMapStorageIT.testDecoderBufsPlugin

java.lang.AssertionError at com.altinity.clickhouse.debezium.embedded.PostgresInitialDockerWKeeperMapStorageIT.testDecoderBufsPlugin(PostgresInitialDockerWKeeperMapStorageIT.java:143)
Raw output
java.lang.AssertionError
	at com.altinity.clickhouse.debezium.embedded.PostgresInitialDockerWKeeperMapStorageIT.testDecoderBufsPlugin(PostgresInitialDockerWKeeperMapStorageIT.java:143)

String offsetValue = new DebeziumOffsetStorage().getDebeziumStorageStatusQuery(getProperties(), writer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "system"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -160,7 +159,7 @@
while(chRs3.next()) {
peopleCount = chRs3.getInt(1);
}
Assert.assertTrue(peopleCount == 2);

Check failure on line 162 in sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

PostgresPgoutputMultipleSchemaIT.testMultipleSchemaReplication

java.lang.AssertionError at com.altinity.clickhouse.debezium.embedded.PostgresPgoutputMultipleSchemaIT.testMultipleSchemaReplication(PostgresPgoutputMultipleSchemaIT.java:162)
Raw output
java.lang.AssertionError
	at com.altinity.clickhouse.debezium.embedded.PostgresPgoutputMultipleSchemaIT.testMultipleSchemaReplication(PostgresPgoutputMultipleSchemaIT.java:162)

if(engine.get() != null) {
engine.get().stop();
Expand Down
Loading
Loading