Skip to content

Commit

Permalink
Merge pull request #557 from Altinity/fix_sequence_number_generation
Browse files Browse the repository at this point in the history
Fixed logic of creating sequence number based on debezium timestamp, …
  • Loading branch information
subkanthi authored May 2, 2024
2 parents 9754ab1 + 1ff600e commit e1bc645
Show file tree
Hide file tree
Showing 11 changed files with 337 additions and 112 deletions.
5 changes: 4 additions & 1 deletion sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,17 @@
<threadCount>10</threadCount> -->
<properties>
<property>
<surefire.test.runOrder>filesystem</surefire.test.runOrder>
<name>listener</name>
<value>com.altinity.clickhouse.debezium.embedded.FailFastListener</value>
</property>
</properties>
<useUnlimitedThreads>true</useUnlimitedThreads>
<perCoreThreadCount>true</perCoreThreadCount>
<useSystemClassLoader>true</useSystemClassLoader>
</configuration>
<runOrder>${surefire.test.runOrder}</runOrder>

</configuration>
</plugin>

<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public static void start(DebeziumRecordParserService recordParserService,
public static void stop() throws IOException {
debeziumChangeEventCapture.stop();

//Stop Rest API
DebeziumEmbeddedRestApi.stop();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class DebeziumEmbeddedRestApi {

private static final Logger log = LogManager.getLogger(DebeziumEmbeddedRestApi.class);

static Javalin app;
public static void startRestApi(Properties props, Injector injector,
DebeziumChangeEventCapture debeziumChangeEventCapture,
Properties userProperties) {
Expand All @@ -31,7 +32,7 @@ public static void startRestApi(Properties props, Injector injector,
cliPort = "7000";
}

Javalin app = Javalin.create().start(Integer.parseInt(cliPort));
app = Javalin.create().start(Integer.parseInt(cliPort));
app.get("/", ctx -> {
ctx.result("Hello World");
});
Expand Down Expand Up @@ -109,4 +110,9 @@ public static void startRestApi(Properties props, Injector injector,
});

}
// Stop the javalin server
public static void stop() {
if(app != null)
app.stop();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.altinity.clickhouse.debezium.embedded.cdc;

import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi;
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
Expand All @@ -9,6 +10,7 @@
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.common.Metrics;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.altinity.clickhouse.sink.connector.db.DBMetadata;
import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAlterTable;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchExecutor;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable;
Expand Down Expand Up @@ -176,7 +178,7 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,

try {
String clickHouseVersion = writer.getClickHouseVersion();
isNewReplacingMergeTreeEngine = new com.altinity.clickhouse.sink.connector.db.DBMetadata()
isNewReplacingMergeTreeEngine = new DBMetadata()
.checkIfNewReplacingMergeTree(clickHouseVersion);
} catch (Exception e) {
log.error("Error retrieving version");
Expand Down Expand Up @@ -656,6 +658,7 @@ public void setup(Properties props, DebeziumRecordParserService debeziumRecordPa
}

public void stop() throws IOException {

try {
if (this.engine != null) {
this.engine.close();
Expand Down Expand Up @@ -749,6 +752,9 @@ private void appendToRecords(List<ClickHouseStruct> convertedRecords) {
}

public static final long SEQUENCE_START = 1000000000;
public static final long SEQUENCE_START_INITIAL = 500000000;

public static long sequenceNumber = SEQUENCE_START;
/**
* Function to add version to every record.
* @param chStructs
Expand All @@ -757,27 +763,32 @@ public static void addVersion(List<ClickHouseStruct> chStructs, boolean initialS

// Start the sequence from 1 million and increment for every record
// and reset the sequence back to 1 million in the next second
long sequenceStartTime = System.currentTimeMillis();
long sequence = SEQUENCE_START;
if(chStructs.isEmpty()) {
return;
}
long sequenceStartTime = chStructs.get(0).getTs_ms();
//long sequence = SEQUENCE_START;
if(initialSeed) {
// Add 500 million to the sequence
sequence += 500000000;
// sequence += 500000000;
// Add 000 to the debezium timestamp.
sequenceNumber = SEQUENCE_START_INITIAL;
}
for(ClickHouseStruct chStruct: chStructs) {

// if the current time is greater than the next second, reset the sequence
// If current time moved to the next second, reset the sequence.
// else increment the sequence.
// Get diff in seconds from current time and last time.
long currentTime = System.currentTimeMillis();
long diff = (currentTime - sequenceStartTime) / 1000;
if(diff >= 1) {
sequence = SEQUENCE_START;
sequenceStartTime = currentTime;
} else {
sequence++;
// Get the first ts_ms from chStruct
// Subsequent records add 1 to sequence.
// If its been more than a second from the first
// ts_ms then reset the sequence.
// Get diff in seconds
int diff = (int) (chStruct.getTs_ms() - sequenceStartTime) / 1000;
if(diff > 1) {
sequenceNumber = SEQUENCE_START;
sequenceStartTime = chStruct.getTs_ms();
} else {
sequenceNumber++;
}
chStruct.setSequenceNumber(sequence);
// Pad the sequence number with 0s
chStruct.setSequenceNumber(chStruct.getTs_ms() * 1000000 + sequenceNumber);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public void testBatchRetryOnCHFailure() throws Exception {


// Close connection.
clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().stop();
ClickHouseDebeziumEmbeddedApplication.stop();
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void testIncrementingSequenceNumbers() throws Exception {
conn.prepareStatement("insert into newtable values('c', 3, 3)").execute();
conn.prepareStatement("insert into newtable values('d', 4, 4)").execute();

Thread.sleep(10000);
Thread.sleep(20000);

// Create connection to ClickHouse and get the version numbers.
String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
Expand All @@ -163,25 +163,30 @@ public void testIncrementingSequenceNumbers() throws Exception {
long version3 = 1L;
long version4 = 1L;

ResultSet version1Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'a'");
ResultSet version1Result = writer.executeQueryWithResultSet("select _version from newtable final where col1 = 'a'");
while(version1Result.next()) {
version1 = version1Result.getLong("_version");
}

ResultSet version2Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'b'");
ResultSet version2Result = writer.executeQueryWithResultSet("select _version from newtable final where col1 = 'b'");
while(version2Result.next()) {
version2 = version2Result.getLong("_version");
}

ResultSet version3Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'c'");
ResultSet version3Result = writer.executeQueryWithResultSet("select _version from newtable final where col1 = 'c'");
while(version3Result.next()) {
version3 = version3Result.getLong("_version");
}

ResultSet version4Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'd'");
ResultSet version4Result = writer.executeQueryWithResultSet("select _version from newtable final where col1 = 'd'");
while(version4Result.next()) {
version4 = version4Result.getLong("_version");
}
System.out.println("Version 1" + version1);
System.out.println("Version 2" + version2);
System.out.println("Version 3" + version3);
System.out.println("Version 4" + version4);


// Check if version 4 is greater than version 3
assertTrue(version4 > version3);
Expand All @@ -190,88 +195,9 @@ public void testIncrementingSequenceNumbers() throws Exception {
// Check if version 2 is greater than version 1
assertTrue(version2 > version1);

// if(engine.get() != null) {
// engine.get().stop();
// }
clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().engine.close();
conn.close();
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();
}

@Test
@DisplayName("Test that validates that the sequence number that is created in non-gtid mode is incremented correctly,"
+ "by performing a lot of updates on the primary key.")
public void testIncrementingSequenceNumberWithUpdates() throws Exception {

Injector injector = Guice.createInjector(new AppInjector());

Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);
props.setProperty("snapshot.mode", "schema_only");
props.setProperty("schema.history.internal.store.only.captured.tables.ddl", "true");
props.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true");

// Override clickhouse server timezone.
ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication();


ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
throw new RuntimeException(e);
}

});

Thread.sleep(25000);

Connection conn = ITCommon.connectToMySQL(mySqlContainer);
conn.prepareStatement("create table `newtable`(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute();

// Insert a new row in the table
conn.prepareStatement("insert into newtable values('a', 1, 1)").execute();

// Generate and execute the update workload
String updateStatement = "UPDATE newtable SET col2 = ? WHERE col1 = ?";
try (PreparedStatement pstmt = conn.prepareStatement(updateStatement)) {
conn.setAutoCommit(false);
for (int i = 0; i < 20000; i++) {
// Set parameters for the update statement
pstmt.setInt(1, 10000 + i);
pstmt.setString(2, "a");

// Execute the update statement
pstmt.executeUpdate();
}
conn.commit();
}


Thread.sleep(10000);

// Validate in Clickhouse the last record written is 29999
String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees");
ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);

long col2 = 1L;
ResultSet version1Result = writer.executeQueryWithResultSet("select col2 from newtable final where col1 = 'a'");
while(version1Result.next()) {
col2 = version1Result.getLong("col2");
}
Thread.sleep(10000);


assertTrue(col2 == 29999);
conn.close();
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,33 @@ public void shouldAssignUniqueSequenceNumbersWithinSameSecond() throws Interrupt
ClickHouseStruct ch1 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2,
currentTimestamp, null,
getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ch1.setTs_ms(currentTimestamp);

ClickHouseStruct ch2 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2,
currentTimestamp + 100, null,
getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ch2.setTs_ms(currentTimestamp);

ClickHouseStruct ch3 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2,
currentTimestamp + 200, null,
getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ch3.setTs_ms(currentTimestamp);

ClickHouseStruct ch4 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2,
currentTimestamp + 300, null,
getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ch4.setTs_ms(currentTimestamp);

ClickHouseStruct ch5 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2,
currentTimestamp + 500, null,
getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ch5.setTs_ms(currentTimestamp);

Thread.sleep(1000);
ClickHouseStruct ch6 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2,
currentTimestamp + 1000, null,
getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ch6.setTs_ms(currentTimestamp);

// Make a list of ch1, ch2, ch3 and ch4
List<ClickHouseStruct> clickHouseStructs = Arrays.asList(ch1, ch2, ch3, ch4, ch5);
Expand All @@ -102,11 +111,11 @@ public void shouldAssignUniqueSequenceNumbersWithinSameSecond() throws Interrupt


// Validate ch5 and ch6
assertTrue(clickHouseStructs2.get(0).getSequenceNumber() != clickHouseStructs2.get(1).getSequenceNumber());
assertTrue(clickHouseStructs2.get(0).getSequenceNumber() < clickHouseStructs2.get(1).getSequenceNumber());

assertTrue(clickHouseStructs.get(3).getSequenceNumber() < clickHouseStructs2.get(0).getSequenceNumber());


// Reset works.
// assertTrue(clickHouseStructs2.get(0).getSequenceNumber() == 1000001);
// DebeziumChangeEventCapture.addVersion();
}

@Test
Expand Down
Loading

0 comments on commit e1bc645

Please sign in to comment.