Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed logic of creating sequence number based on debezium timestamp, … #557

Merged
merged 3 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,17 @@
<threadCount>10</threadCount> -->
<properties>
<property>
<surefire.test.runOrder>filesystem</surefire.test.runOrder>
<name>listener</name>
<value>com.altinity.clickhouse.debezium.embedded.FailFastListener</value>
</property>
</properties>
<useUnlimitedThreads>true</useUnlimitedThreads>
<perCoreThreadCount>true</perCoreThreadCount>
<useSystemClassLoader>true</useSystemClassLoader>
</configuration>
<runOrder>${surefire.test.runOrder}</runOrder>

</configuration>
</plugin>

<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public static void start(DebeziumRecordParserService recordParserService,
public static void stop() throws IOException {
debeziumChangeEventCapture.stop();

//Stop Rest API
DebeziumEmbeddedRestApi.stop();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class DebeziumEmbeddedRestApi {

private static final Logger log = LogManager.getLogger(DebeziumEmbeddedRestApi.class);

static Javalin app;
public static void startRestApi(Properties props, Injector injector,
DebeziumChangeEventCapture debeziumChangeEventCapture,
Properties userProperties) {
Expand All @@ -31,7 +32,7 @@ public static void startRestApi(Properties props, Injector injector,
cliPort = "7000";
}

Javalin app = Javalin.create().start(Integer.parseInt(cliPort));
app = Javalin.create().start(Integer.parseInt(cliPort));
app.get("/", ctx -> {
ctx.result("Hello World");
});
Expand Down Expand Up @@ -109,4 +110,9 @@ public static void startRestApi(Properties props, Injector injector,
});

}
// Stop the javalin server
public static void stop() {
if(app != null)
app.stop();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.altinity.clickhouse.debezium.embedded.cdc;

import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi;
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
Expand All @@ -9,6 +10,7 @@
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.common.Metrics;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.altinity.clickhouse.sink.connector.db.DBMetadata;
import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAlterTable;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchExecutor;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable;
Expand Down Expand Up @@ -176,7 +178,7 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,

try {
String clickHouseVersion = writer.getClickHouseVersion();
isNewReplacingMergeTreeEngine = new com.altinity.clickhouse.sink.connector.db.DBMetadata()
isNewReplacingMergeTreeEngine = new DBMetadata()
.checkIfNewReplacingMergeTree(clickHouseVersion);
} catch (Exception e) {
log.error("Error retrieving version");
Expand Down Expand Up @@ -656,6 +658,7 @@ public void setup(Properties props, DebeziumRecordParserService debeziumRecordPa
}

public void stop() throws IOException {

try {
if (this.engine != null) {
this.engine.close();
Expand Down Expand Up @@ -749,6 +752,9 @@ private void appendToRecords(List<ClickHouseStruct> convertedRecords) {
}

public static final long SEQUENCE_START = 1000000000;
public static final long SEQUENCE_START_INITIAL = 500000000;

public static long sequenceNumber = SEQUENCE_START;
/**
* Function to add version to every record.
* @param chStructs
Expand All @@ -757,27 +763,32 @@ public static void addVersion(List<ClickHouseStruct> chStructs, boolean initialS

// Start the sequence from 1 million and increment for every record
// and reset the sequence back to 1 million in the next second
long sequenceStartTime = System.currentTimeMillis();
long sequence = SEQUENCE_START;
if(chStructs.isEmpty()) {
return;
}
long sequenceStartTime = chStructs.get(0).getTs_ms();
//long sequence = SEQUENCE_START;
if(initialSeed) {
// Add 500 million to the sequence
sequence += 500000000;
// sequence += 500000000;
// Add 000 to the debezium timestamp.
sequenceNumber = SEQUENCE_START_INITIAL;
}
for(ClickHouseStruct chStruct: chStructs) {

// if the current time is greater than the next second, reset the sequence
// If current time moved to the next second, reset the sequence.
// else increment the sequence.
// Get diff in seconds from current time and last time.
long currentTime = System.currentTimeMillis();
long diff = (currentTime - sequenceStartTime) / 1000;
if(diff >= 1) {
sequence = SEQUENCE_START;
sequenceStartTime = currentTime;
} else {
sequence++;
// Get the first ts_ms from chStruct
// Subsequent records add 1 to sequence.
// If its been more than a second from the first
// ts_ms then reset the sequence.
// Get diff in seconds
int diff = (int) (chStruct.getTs_ms() - sequenceStartTime) / 1000;
if(diff > 1) {
sequenceNumber = SEQUENCE_START;
sequenceStartTime = chStruct.getTs_ms();
} else {
sequenceNumber++;
}
chStruct.setSequenceNumber(sequence);
// Pad the sequence number with 0s
chStruct.setSequenceNumber(chStruct.getTs_ms() * 1000000 + sequenceNumber);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public void testBatchRetryOnCHFailure() throws Exception {


// Close connection.
clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().stop();
ClickHouseDebeziumEmbeddedApplication.stop();
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void testIncrementingSequenceNumbers() throws Exception {
conn.prepareStatement("insert into newtable values('c', 3, 3)").execute();
conn.prepareStatement("insert into newtable values('d', 4, 4)").execute();

Thread.sleep(10000);
Thread.sleep(20000);

// Create connection to ClickHouse and get the version numbers.
String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
Expand All @@ -163,25 +163,30 @@ public void testIncrementingSequenceNumbers() throws Exception {
long version3 = 1L;
long version4 = 1L;

ResultSet version1Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'a'");
ResultSet version1Result = writer.executeQueryWithResultSet("select _version from newtable final where col1 = 'a'");
while(version1Result.next()) {
version1 = version1Result.getLong("_version");
}

ResultSet version2Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'b'");
ResultSet version2Result = writer.executeQueryWithResultSet("select _version from newtable final where col1 = 'b'");
while(version2Result.next()) {
version2 = version2Result.getLong("_version");
}

ResultSet version3Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'c'");
ResultSet version3Result = writer.executeQueryWithResultSet("select _version from newtable final where col1 = 'c'");
while(version3Result.next()) {
version3 = version3Result.getLong("_version");
}

ResultSet version4Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'd'");
ResultSet version4Result = writer.executeQueryWithResultSet("select _version from newtable final where col1 = 'd'");
while(version4Result.next()) {
version4 = version4Result.getLong("_version");
}
System.out.println("Version 1" + version1);
System.out.println("Version 2" + version2);
System.out.println("Version 3" + version3);
System.out.println("Version 4" + version4);


// Check if version 4 is greater than version 3
assertTrue(version4 > version3);
Expand All @@ -190,88 +195,9 @@ public void testIncrementingSequenceNumbers() throws Exception {
// Check if version 2 is greater than version 1
assertTrue(version2 > version1);

// if(engine.get() != null) {
// engine.get().stop();
// }
clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().engine.close();
conn.close();
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();
}

@Test
@DisplayName("Test that validates that the sequence number that is created in non-gtid mode is incremented correctly,"
+ "by performing a lot of updates on the primary key.")
public void testIncrementingSequenceNumberWithUpdates() throws Exception {

Injector injector = Guice.createInjector(new AppInjector());

Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);
props.setProperty("snapshot.mode", "schema_only");
props.setProperty("schema.history.internal.store.only.captured.tables.ddl", "true");
props.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true");

// Override clickhouse server timezone.
ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication();


ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
throw new RuntimeException(e);
}

});

Thread.sleep(25000);

Connection conn = ITCommon.connectToMySQL(mySqlContainer);
conn.prepareStatement("create table `newtable`(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute();

// Insert a new row in the table
conn.prepareStatement("insert into newtable values('a', 1, 1)").execute();

// Generate and execute the update workload
String updateStatement = "UPDATE newtable SET col2 = ? WHERE col1 = ?";
try (PreparedStatement pstmt = conn.prepareStatement(updateStatement)) {
conn.setAutoCommit(false);
for (int i = 0; i < 20000; i++) {
// Set parameters for the update statement
pstmt.setInt(1, 10000 + i);
pstmt.setString(2, "a");

// Execute the update statement
pstmt.executeUpdate();
}
conn.commit();
}


Thread.sleep(10000);

// Validate in Clickhouse the last record written is 29999
String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees");
ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);

long col2 = 1L;
ResultSet version1Result = writer.executeQueryWithResultSet("select col2 from newtable final where col1 = 'a'");
while(version1Result.next()) {
col2 = version1Result.getLong("col2");
}
Thread.sleep(10000);


assertTrue(col2 == 29999);
conn.close();
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,33 @@ public void shouldAssignUniqueSequenceNumbersWithinSameSecond() throws Interrupt
ClickHouseStruct ch1 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2,
currentTimestamp, null,
getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ch1.setTs_ms(currentTimestamp);

ClickHouseStruct ch2 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2,
currentTimestamp + 100, null,
getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ch2.setTs_ms(currentTimestamp);

ClickHouseStruct ch3 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2,
currentTimestamp + 200, null,
getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ch3.setTs_ms(currentTimestamp);

ClickHouseStruct ch4 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2,
currentTimestamp + 300, null,
getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ch4.setTs_ms(currentTimestamp);

ClickHouseStruct ch5 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2,
currentTimestamp + 500, null,
getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ch5.setTs_ms(currentTimestamp);

Thread.sleep(1000);
ClickHouseStruct ch6 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2,
currentTimestamp + 1000, null,
getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ch6.setTs_ms(currentTimestamp);

// Make a list of ch1, ch2, ch3 and ch4
List<ClickHouseStruct> clickHouseStructs = Arrays.asList(ch1, ch2, ch3, ch4, ch5);
Expand All @@ -102,11 +111,11 @@ public void shouldAssignUniqueSequenceNumbersWithinSameSecond() throws Interrupt


// Validate ch5 and ch6
assertTrue(clickHouseStructs2.get(0).getSequenceNumber() != clickHouseStructs2.get(1).getSequenceNumber());
assertTrue(clickHouseStructs2.get(0).getSequenceNumber() < clickHouseStructs2.get(1).getSequenceNumber());

assertTrue(clickHouseStructs.get(3).getSequenceNumber() < clickHouseStructs2.get(0).getSequenceNumber());


// Reset works.
// assertTrue(clickHouseStructs2.get(0).getSequenceNumber() == 1000001);
// DebeziumChangeEventCapture.addVersion();
}

@Test
Expand Down
Loading
Loading