From d0d4a7bfc54099c370836b4c5b4b49f6d350c553 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 26 Apr 2024 16:42:48 -0400 Subject: [PATCH 1/3] Fixed logic of creating sequence number based on debezium timestamp, separated out integration tests --- ...ClickHouseDebeziumEmbeddedApplication.java | 2 + .../embedded/api/DebeziumEmbeddedRestApi.java | 8 +- .../cdc/DebeziumChangeEventCapture.java | 35 ++-- .../embedded/cdc/BatchRetryOnFailureIT.java | 3 +- .../cdc/DebeziumChangeEventCaptureIT.java | 96 ++--------- .../cdc/DebeziumChangeEventCaptureTest.java | 12 +- .../embedded/cdc/DebeziumStorageViewIT.java | 111 +++++++++++++ .../cdc/MultipleUpdatesWSameTimestampIT.java | 153 ++++++++++++++++++ .../db/batch/PreparedStatementExecutor.java | 2 +- .../executor/DebeziumOffsetManagement.java | 6 +- 10 files changed, 322 insertions(+), 106 deletions(-) create mode 100644 sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java create mode 100644 sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java index acf7e0276..507170b86 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java @@ -133,6 +133,8 @@ public static void start(DebeziumRecordParserService recordParserService, public static void stop() throws IOException { debeziumChangeEventCapture.stop(); + //Stop Rest API + DebeziumEmbeddedRestApi.stop(); } /** diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java index e62a3f77e..4dc42b405 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java @@ -23,6 +23,7 @@ public class DebeziumEmbeddedRestApi { private static final Logger log = LogManager.getLogger(DebeziumEmbeddedRestApi.class); + static Javalin app; public static void startRestApi(Properties props, Injector injector, DebeziumChangeEventCapture debeziumChangeEventCapture, Properties userProperties) { @@ -31,7 +32,7 @@ public static void startRestApi(Properties props, Injector injector, cliPort = "7000"; } - Javalin app = Javalin.create().start(Integer.parseInt(cliPort)); + app = Javalin.create().start(Integer.parseInt(cliPort)); app.get("/", ctx -> { ctx.result("Hello World"); }); @@ -109,4 +110,9 @@ public static void startRestApi(Properties props, Injector injector, }); } + // Stop the javalin server + public static void stop() { + if(app != null) + app.stop(); + } } diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 803205558..388ff73c6 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -1,5 +1,6 @@ package com.altinity.clickhouse.debezium.embedded.cdc; +import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper; import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig; import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; @@ -656,6 +657,7 @@ public void setup(Properties props, DebeziumRecordParserService debeziumRecordPa } public void stop() throws IOException { + try { if (this.engine != null) { this.engine.close(); @@ -749,6 +751,7 @@ private void appendToRecords(List convertedRecords) { } public static final long SEQUENCE_START = 1000000000; + public static final long SEQUENCE_START_INITIAL = 500000000; /** * Function to add version to every record. * @param chStructs @@ -757,27 +760,33 @@ public static void addVersion(List chStructs, boolean initialS // Start the sequence from 1 million and increment for every record // and reset the sequence back to 1 million in the next second - long sequenceStartTime = System.currentTimeMillis(); + if(chStructs.isEmpty()) { + return; + } + long sequenceStartTime = chStructs.get(0).getTs_ms(); long sequence = SEQUENCE_START; if(initialSeed) { // Add 500 million to the sequence - sequence += 500000000; + // sequence += 500000000; + // Add 000 to the debezium timestamp. + sequence = SEQUENCE_START_INITIAL; } for(ClickHouseStruct chStruct: chStructs) { - - // if the current time is greater than the next second, reset the sequence - // If current time moved to the next second, reset the sequence. - // else increment the sequence. - // Get diff in seconds from current time and last time. - long currentTime = System.currentTimeMillis(); - long diff = (currentTime - sequenceStartTime) / 1000; - if(diff >= 1) { + // Get the first ts_ms from chStruct + // Subsequent records add 1 to sequence. + // If its been more than a second from the first + // ts_ms then reset the sequence. + // Get diff in seconds + int diff = (int) (chStruct.getTs_ms() - sequenceStartTime) / 1000; + if(diff > 1) { sequence = SEQUENCE_START; - sequenceStartTime = currentTime; - } else { + sequenceStartTime = chStruct.getTs_ms(); + } else { sequence++; } - chStruct.setSequenceNumber(sequence); + // Pad the sequence number with 0s + chStruct.setSequenceNumber(chStruct.getTs_ms() * 1000 + sequence); + } } } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java index dccc3e3cb..27f3c3325 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java @@ -118,6 +118,7 @@ public void testBatchRetryOnCHFailure() throws Exception { // Close connection. - clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().stop(); + ClickHouseDebeziumEmbeddedApplication.stop(); + executorService.shutdown(); } } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java index 56585a360..f60e5707c 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java @@ -147,7 +147,7 @@ public void testIncrementingSequenceNumbers() throws Exception { conn.prepareStatement("insert into newtable values('c', 3, 3)").execute(); conn.prepareStatement("insert into newtable values('d', 4, 4)").execute(); - Thread.sleep(10000); + Thread.sleep(20000); // Create connection to ClickHouse and get the version numbers. String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), @@ -163,25 +163,30 @@ public void testIncrementingSequenceNumbers() throws Exception { long version3 = 1L; long version4 = 1L; - ResultSet version1Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'a'"); + ResultSet version1Result = writer.executeQueryWithResultSet("select _version from newtable final where col1 = 'a'"); while(version1Result.next()) { version1 = version1Result.getLong("_version"); } - ResultSet version2Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'b'"); + ResultSet version2Result = writer.executeQueryWithResultSet("select _version from newtable final where col1 = 'b'"); while(version2Result.next()) { version2 = version2Result.getLong("_version"); } - ResultSet version3Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'c'"); + ResultSet version3Result = writer.executeQueryWithResultSet("select _version from newtable final where col1 = 'c'"); while(version3Result.next()) { version3 = version3Result.getLong("_version"); } - ResultSet version4Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'd'"); + ResultSet version4Result = writer.executeQueryWithResultSet("select _version from newtable final where col1 = 'd'"); while(version4Result.next()) { version4 = version4Result.getLong("_version"); } + System.out.println("Version 1" + version1); + System.out.println("Version 2" + version2); + System.out.println("Version 3" + version3); + System.out.println("Version 4" + version4); + // Check if version 4 is greater than version 3 assertTrue(version4 > version3); @@ -190,88 +195,9 @@ public void testIncrementingSequenceNumbers() throws Exception { // Check if version 2 is greater than version 1 assertTrue(version2 > version1); -// if(engine.get() != null) { -// engine.get().stop(); -// } + clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().engine.close(); conn.close(); - // Files.deleteIfExists(tmpFilePath); executorService.shutdown(); } - @Test - @DisplayName("Test that validates that the sequence number that is created in non-gtid mode is incremented correctly," - + "by performing a lot of updates on the primary key.") - public void testIncrementingSequenceNumberWithUpdates() throws Exception { - - Injector injector = Guice.createInjector(new AppInjector()); - - Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer); - props.setProperty("snapshot.mode", "schema_only"); - props.setProperty("schema.history.internal.store.only.captured.tables.ddl", "true"); - props.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true"); - - // Override clickhouse server timezone. - ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication(); - - - ExecutorService executorService = Executors.newFixedThreadPool(1); - executorService.execute(() -> { - try { - clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), - injector.getInstance(DDLParserService.class), props, false); - DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() - , new Properties()); - } catch (Exception e) { - throw new RuntimeException(e); - } - - }); - - Thread.sleep(25000); - - Connection conn = ITCommon.connectToMySQL(mySqlContainer); - conn.prepareStatement("create table `newtable`(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute(); - - // Insert a new row in the table - conn.prepareStatement("insert into newtable values('a', 1, 1)").execute(); - - // Generate and execute the update workload - String updateStatement = "UPDATE newtable SET col2 = ? WHERE col1 = ?"; - try (PreparedStatement pstmt = conn.prepareStatement(updateStatement)) { - conn.setAutoCommit(false); - for (int i = 0; i < 20000; i++) { - // Set parameters for the update statement - pstmt.setInt(1, 10000 + i); - pstmt.setString(2, "a"); - - // Execute the update statement - pstmt.executeUpdate(); - } - conn.commit(); - } - - - Thread.sleep(10000); - - // Validate in Clickhouse the last record written is 29999 - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); - - long col2 = 1L; - ResultSet version1Result = writer.executeQueryWithResultSet("select col2 from newtable final where col1 = 'a'"); - while(version1Result.next()) { - col2 = version1Result.getLong("col2"); - } - Thread.sleep(10000); - - - assertTrue(col2 == 29999); - conn.close(); - // Files.deleteIfExists(tmpFilePath); - executorService.shutdown(); - } } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java index a637fbf79..181bbd77c 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java @@ -67,24 +67,33 @@ public void shouldAssignUniqueSequenceNumbersWithinSameSecond() throws Interrupt ClickHouseStruct ch1 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2, currentTimestamp, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE); + ch1.setTs_ms(currentTimestamp); + ClickHouseStruct ch2 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2, currentTimestamp + 100, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE); + ch2.setTs_ms(currentTimestamp); + ClickHouseStruct ch3 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2, currentTimestamp + 200, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE); + ch3.setTs_ms(currentTimestamp); + ClickHouseStruct ch4 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2, currentTimestamp + 300, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE); + ch4.setTs_ms(currentTimestamp); ClickHouseStruct ch5 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2, currentTimestamp + 500, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE); + ch5.setTs_ms(currentTimestamp); Thread.sleep(1000); ClickHouseStruct ch6 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2, currentTimestamp + 1000, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE); + ch6.setTs_ms(currentTimestamp); // Make a list of ch1, ch2, ch3 and ch4 List clickHouseStructs = Arrays.asList(ch1, ch2, ch3, ch4, ch5); @@ -104,9 +113,6 @@ public void shouldAssignUniqueSequenceNumbersWithinSameSecond() throws Interrupt // Validate ch5 and ch6 assertTrue(clickHouseStructs2.get(0).getSequenceNumber() != clickHouseStructs2.get(1).getSequenceNumber()); - // Reset works. - // assertTrue(clickHouseStructs2.get(0).getSequenceNumber() == 1000001); - // DebeziumChangeEventCapture.addVersion(); } @Test diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java new file mode 100644 index 000000000..3207b011d --- /dev/null +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java @@ -0,0 +1,111 @@ +package com.altinity.clickhouse.debezium.embedded.cdc; + +import com.altinity.clickhouse.debezium.embedded.AppInjector; +import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; +import com.altinity.clickhouse.debezium.embedded.ITCommon; +import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; +import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; +import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.clickhouse.jdbc.ClickHouseConnection; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.log4j.BasicConfigurator; +import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumPropertiesForSchemaOnly; + +@Testcontainers +@Disabled +@DisplayName("Test that validates if the debezium storage view is created successfully") +public class DebeziumStorageViewIT { + protected MySQLContainer mySqlContainer; + + @Container + public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest") + .asCompatibleSubstituteFor("clickhouse")) + .withInitScript("init_clickhouse_schema_only_column_timezone.sql") + // .withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml") + .withUsername("ch_user") + .withPassword("password") + .withExposedPorts(8123); + @BeforeEach + public void startContainers() throws InterruptedException { + mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest") + .asCompatibleSubstituteFor("mysql")) + .withDatabaseName("employees").withUsername("root").withPassword("adminpass") + .withInitScript("datetime.sql") + .withExtraHost("mysql-server", "0.0.0.0") + .waitingFor(new HttpWaitStrategy().forPort(3306)); + + BasicConfigurator.configure(); + mySqlContainer.start(); + clickHouseContainer.start(); + Thread.sleep(15000); + } + + @Test + @DisplayName("Validates that the debezium storage view is created successfully") + public void debeziumStorageView() throws Exception { + + Injector injector = Guice.createInjector(new AppInjector()); + + Properties props = getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer); + ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication(); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), + injector.getInstance(DDLParserService.class), props, false); + DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() + , new Properties()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Thread.sleep(10000); + + // Connect to clickhouse and validate that the view was created successfully. + String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "altinity_sink_connector"); + ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "altinity_sink_connector", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + + + // Check if the view altinity_sink_connector.show_replica_status was created successfully. + ResultSet resultSet = writer.executeQueryWithResultSet("show create view altinity_sink_connector.show_replica_status"); + + boolean viewCheck = false; + while (resultSet.next()) { + viewCheck = true; + Assert.assertTrue(resultSet.getString("statement").contains("CREATE VIEW altinity_sink_connector.show_replica_status")); + break; + } + Assert.assertTrue(viewCheck); + + clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().stop(); + executorService.shutdown(); + } +} diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java new file mode 100644 index 000000000..1a08d2626 --- /dev/null +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java @@ -0,0 +1,153 @@ +package com.altinity.clickhouse.debezium.embedded.cdc; + +import com.altinity.clickhouse.debezium.embedded.AppInjector; +import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; +import com.altinity.clickhouse.debezium.embedded.ITCommon; +import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; +import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; +import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.altinity.clickhouse.sink.connector.model.DBCredentials; +import com.clickhouse.jdbc.ClickHouseConnection; +import com.google.common.collect.Maps; +import com.google.inject.Guice; +import com.google.inject.Injector; +import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig; +import org.apache.log4j.BasicConfigurator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties; +import static org.junit.Assert.assertTrue; + +@Testcontainers +@DisplayName("Test that validates that the sequence number that is created in non-gtid mode is incremented correctly," + + "by performing a lot of updates on the primary key.") +public class MultipleUpdatesWSameTimestampIT { + + private static final Logger log = LoggerFactory.getLogger(MultipleUpdatesWSameTimestampIT.class); + + + protected MySQLContainer mySqlContainer; + + @Container + public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest") + .asCompatibleSubstituteFor("clickhouse")) + .withInitScript("init_clickhouse_schema_only_column_timezone.sql") + // .withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml") + .withUsername("ch_user") + .withPassword("password") + .withExposedPorts(8123); + @BeforeEach + public void startContainers() throws InterruptedException { + mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest") + .asCompatibleSubstituteFor("mysql")) + .withDatabaseName("employees").withUsername("root").withPassword("adminpass") +// .withInitScript("15k_tables_mysql.sql") + .withExtraHost("mysql-server", "0.0.0.0") + .waitingFor(new HttpWaitStrategy().forPort(3306)); + + BasicConfigurator.configure(); + mySqlContainer.start(); + clickHouseContainer.start(); + Thread.sleep(35000); + } + + + @DisplayName("Test that validates that the sequence number that is created in non-gtid mode is incremented correctly," + + "by performing a lot of updates on the primary key.") + public void testIncrementingSequenceNumberWithUpdates() throws Exception { + + Injector injector = Guice.createInjector(new AppInjector()); + + Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer); + props.setProperty("snapshot.mode", "schema_only"); + props.setProperty("schema.history.internal.store.only.captured.tables.ddl", "true"); + props.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true"); + + // Override clickhouse server timezone. + ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication(); + + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), + injector.getInstance(DDLParserService.class), props, false); + DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() + , new Properties()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + }); + + Thread.sleep(25000); + + Connection conn = ITCommon.connectToMySQL(mySqlContainer); + conn.prepareStatement("create table `newtable`(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute(); + + // Insert a new row in the table + conn.prepareStatement("insert into newtable values('a', 1, 1)").execute(); + + // Generate and execute the update workload + String updateStatement = "UPDATE newtable SET col2 = ? WHERE col1 = ?"; + try (PreparedStatement pstmt = conn.prepareStatement(updateStatement)) { + conn.setAutoCommit(false); + for (int i = 0; i < 20000; i++) { + // Set parameters for the update statement + pstmt.setInt(1, 10000 + i); + pstmt.setString(2, "a"); + + // Execute the update statement + pstmt.executeUpdate(); + } + conn.commit(); + } + + + Thread.sleep(10000); + + // Validate in Clickhouse the last record written is 29999 + String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "employees"); + ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + + long col2 = 1L; + ResultSet version1Result = writer.executeQueryWithResultSet("select col2 from newtable final where col1 = 'a'"); + while(version1Result.next()) { + col2 = version1Result.getLong("col2"); + } + Thread.sleep(10000); + + + assertTrue(col2 == 29999); + clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().engine.close(); + + conn.close(); + // Files.deleteIfExists(tmpFilePath); + executorService.shutdown(); + } +} diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java index 91823aee1..35ee1cdc0 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java @@ -319,7 +319,7 @@ public void insertPreparedStatement(Map columnNameToIndexMap, P ps.setLong(columnNameToIndexMap.get(versionColumn), record.getGtid()); } } else { - ps.setLong(columnNameToIndexMap.get(versionColumn), SnowFlakeId.generate(record.getTs_ms(), record.getSequenceNumber(), false)); + ps.setLong(columnNameToIndexMap.get(versionColumn), record.getSequenceNumber()); } } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java index fb1437175..7e2ee3395 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java @@ -137,12 +137,14 @@ static synchronized void acknowledgeRecords(List batch) throws record.getCommitter().markProcessed(record.getSourceRecord()); log.debug("***** Record successfully marked as processed ****" + "Binlog file:" + - record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid()); + record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid() + + "Sequence Number: " + record.getSequenceNumber() + "Debezium Timestamp: " + record.getDebezium_ts_ms()); if(record.isLastRecordInBatch()) { record.getCommitter().markBatchFinished(); log.info("***** BATCH marked as processed to debezium ****" + "Binlog file:" + - record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid()); + record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid() + + "Sequence Number: " + record.getSequenceNumber() + "Debezium Timestamp: " + record.getDebezium_ts_ms()); } } } From 3f4b6200051e079ba3198abf78b1920f9ee3c9ac Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 26 Apr 2024 19:55:07 -0400 Subject: [PATCH 2/3] Added Integration tests --- sink-connector-lightweight/pom.xml | 5 ++++- .../debezium/embedded/cdc/DebeziumChangeEventCapture.java | 3 +-- .../debezium/embedded/cdc/DebeziumStorageViewIT.java | 2 +- .../embedded/cdc/MultipleUpdatesWSameTimestampIT.java | 1 + 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml index 153a618ec..136498017 100644 --- a/sink-connector-lightweight/pom.xml +++ b/sink-connector-lightweight/pom.xml @@ -539,6 +539,7 @@ 10 --> + filesystem listener com.altinity.clickhouse.debezium.embedded.FailFastListener @@ -546,7 +547,9 @@ true true true - + ${surefire.test.runOrder} + + diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 388ff73c6..a1748849c 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -785,8 +785,7 @@ public static void addVersion(List chStructs, boolean initialS sequence++; } // Pad the sequence number with 0s - chStruct.setSequenceNumber(chStruct.getTs_ms() * 1000 + sequence); - + chStruct.setSequenceNumber(chStruct.getTs_ms() * 100000 + sequence); } } } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java index 3207b011d..2a150048d 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java @@ -34,7 +34,6 @@ import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumPropertiesForSchemaOnly; @Testcontainers -@Disabled @DisplayName("Test that validates if the debezium storage view is created successfully") public class DebeziumStorageViewIT { protected MySQLContainer mySqlContainer; @@ -105,6 +104,7 @@ public void debeziumStorageView() throws Exception { } Assert.assertTrue(viewCheck); + ClickHouseDebeziumEmbeddedApplication.stop(); clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().stop(); executorService.shutdown(); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java index 1a08d2626..0ef1ca3a4 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java @@ -75,6 +75,7 @@ public void startContainers() throws InterruptedException { @DisplayName("Test that validates that the sequence number that is created in non-gtid mode is incremented correctly," + "by performing a lot of updates on the primary key.") + @Test public void testIncrementingSequenceNumberWithUpdates() throws Exception { Injector injector = Guice.createInjector(new AppInjector()); From 1ff600e6f8386b1eafd6806b79eeb69c05b9b5a2 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 29 Apr 2024 08:57:55 -0400 Subject: [PATCH 3/3] Keep the sequence number state for cases where different batches could have the same timestamp --- .../embedded/cdc/DebeziumChangeEventCapture.java | 15 +++++++++------ .../cdc/DebeziumChangeEventCaptureTest.java | 5 ++++- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index a1748849c..06afbff36 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -10,6 +10,7 @@ import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; import com.altinity.clickhouse.sink.connector.common.Metrics; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.altinity.clickhouse.sink.connector.db.DBMetadata; import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAlterTable; import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchExecutor; import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable; @@ -177,7 +178,7 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr, try { String clickHouseVersion = writer.getClickHouseVersion(); - isNewReplacingMergeTreeEngine = new com.altinity.clickhouse.sink.connector.db.DBMetadata() + isNewReplacingMergeTreeEngine = new DBMetadata() .checkIfNewReplacingMergeTree(clickHouseVersion); } catch (Exception e) { log.error("Error retrieving version"); @@ -752,6 +753,8 @@ private void appendToRecords(List convertedRecords) { public static final long SEQUENCE_START = 1000000000; public static final long SEQUENCE_START_INITIAL = 500000000; + + public static long sequenceNumber = SEQUENCE_START; /** * Function to add version to every record. * @param chStructs @@ -764,12 +767,12 @@ public static void addVersion(List chStructs, boolean initialS return; } long sequenceStartTime = chStructs.get(0).getTs_ms(); - long sequence = SEQUENCE_START; + //long sequence = SEQUENCE_START; if(initialSeed) { // Add 500 million to the sequence // sequence += 500000000; // Add 000 to the debezium timestamp. - sequence = SEQUENCE_START_INITIAL; + sequenceNumber = SEQUENCE_START_INITIAL; } for(ClickHouseStruct chStruct: chStructs) { // Get the first ts_ms from chStruct @@ -779,13 +782,13 @@ public static void addVersion(List chStructs, boolean initialS // Get diff in seconds int diff = (int) (chStruct.getTs_ms() - sequenceStartTime) / 1000; if(diff > 1) { - sequence = SEQUENCE_START; + sequenceNumber = SEQUENCE_START; sequenceStartTime = chStruct.getTs_ms(); } else { - sequence++; + sequenceNumber++; } // Pad the sequence number with 0s - chStruct.setSequenceNumber(chStruct.getTs_ms() * 100000 + sequence); + chStruct.setSequenceNumber(chStruct.getTs_ms() * 1000000 + sequenceNumber); } } } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java index 181bbd77c..0843d7353 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java @@ -111,7 +111,10 @@ public void shouldAssignUniqueSequenceNumbersWithinSameSecond() throws Interrupt // Validate ch5 and ch6 - assertTrue(clickHouseStructs2.get(0).getSequenceNumber() != clickHouseStructs2.get(1).getSequenceNumber()); + assertTrue(clickHouseStructs2.get(0).getSequenceNumber() < clickHouseStructs2.get(1).getSequenceNumber()); + + assertTrue(clickHouseStructs.get(3).getSequenceNumber() < clickHouseStructs2.get(0).getSequenceNumber()); + }