Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed from ConcurrentLinkedQueue to LinkedBlockedQueue with the size configurable #491

Merged
merged 4 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sink-connector-lightweight/docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,7 @@ restart.event.loop.timeout.period.secs: "3000"
#clickhouse.jdbc.params: "max_buffer_size=1000000,socket_timeout=10000"

# Maximum number of threads in the thread pool for processing CDC records.
#thread.pool.size: 10
#thread.pool.size: 10

# Sink Connector maximum queue size
#sink.connector.max.queue.size: "100000"
3 changes: 3 additions & 0 deletions sink-connector-lightweight/docker/config_local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,6 @@ restart.event.loop.timeout.period.secs: "30"

# ClickHouse JDBC configuration parameters, as a list of key-value pairs separated by commas.
#clickhouse.jdbc.params: "max_buffer_size=1000000,socket_timeout=10000"

# The maximum number of records that should be loaded into memory while streaming data from MySQL to ClickHouse.
sink.connector.max.queue.size: "100000"
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class DebeziumChangeEventCapture {
private ClickHouseBatchRunnable runnable;

// Records grouped by Topic Name
private ConcurrentLinkedQueue<List<ClickHouseStruct>> records = new ConcurrentLinkedQueue<>();
private LinkedBlockingQueue<List<ClickHouseStruct>> records;


private BaseDbWriter writer = null;
Expand Down Expand Up @@ -586,6 +586,13 @@ public void connectorStopped() {
public void setup(Properties props, DebeziumRecordParserService debeziumRecordParserService,
DDLParserService ddlParserService, boolean forceStart) throws IOException, ClassNotFoundException {

// Check if max queue size was defined by the user.
if(props.getProperty(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()) != null) {
int maxQueueSize = Integer.parseInt(props.getProperty(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()));
this.records = new LinkedBlockingQueue<>(maxQueueSize);
} else {
this.records = new LinkedBlockingQueue<>();
}

ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props));
Metrics.initialize(props.getProperty(ClickHouseSinkConnectorConfigVariables.ENABLE_METRICS.toString()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.altinity.clickhouse.debezium.embedded;

import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.junit.Assert;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.Testcontainers;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;

import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import static com.altinity.clickhouse.debezium.embedded.PostgresProperties.getDefaultProperties;

public class PostgresInitialDockerIT {

@Container
public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
.withUsername("ch_user")
.withPassword("password")
.withExposedPorts(8123);

public static DockerImageName myImage = DockerImageName.parse("debezium/postgres:15-alpine").asCompatibleSubstituteFor("postgres");

@Container
public static PostgreSQLContainer postgreSQLContainer = (PostgreSQLContainer) new PostgreSQLContainer(myImage)
.withInitScript("init_postgres.sql")
.withDatabaseName("public")
.withUsername("root")
.withPassword("root")
.withExposedPorts(5432)
.withCommand("postgres -c wal_level=logical")
.withNetworkAliases("postgres").withAccessToHost(true);



public Properties getProperties() throws Exception {

Properties properties = getDefaultProperties(postgreSQLContainer, clickHouseContainer);
properties.put("plugin.name", "decoderbufs");
properties.put("plugin.path", "/");
properties.put("table.include.list", "public.tm");
properties.put("slot.max.retries", "6");
properties.put("slot.retry.delay.ms", "5000");
properties.put("database.allowPublicKeyRetrieval", "true");
properties.put("table.include.list", "public.tm,public.tm2");

return properties;
}

@Test
@DisplayName("Integration Test - Validates PostgreSQL replication when the plugin is set to DecoderBufs")
public void testDecoderBufsPlugin() throws Exception {
Network network = Network.newNetwork();

postgreSQLContainer.withNetwork(network).start();
clickHouseContainer.withNetwork(network).start();
Thread.sleep(10000);

Testcontainers.exposeHostPorts(postgreSQLContainer.getFirstMappedPort());
AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

Thread.sleep(10000);//
Thread.sleep(50000);

String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public");
ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
Map<String, String> tmColumns = writer.getColumnsDataTypesForTable("tm");
Assert.assertTrue(tmColumns.size() == 22);
Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID"));
Assert.assertTrue(tmColumns.get("secid").equalsIgnoreCase("Nullable(UUID)"));
//Assert.assertTrue(tmColumns.get("am").equalsIgnoreCase("Nullable(Decimal(21,5))"));
Assert.assertTrue(tmColumns.get("created").equalsIgnoreCase("Nullable(DateTime64(6))"));


int tmCount = 0;
ResultSet chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery();
while(chRs.next()) {
tmCount = chRs.getInt(1);
}

Assert.assertTrue(tmCount == 2);

if(engine.get() != null) {
engine.get().stop();
}
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();

}
}
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,19 @@ static ConfigDef newConfigDef() {
6,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.JDBC_PARAMETERS.toString())
// Define the max queue size.
.define(
ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString(),
Type.INT,
500000,
ConfigDef.Range.atLeast(1),
Importance.HIGH,
"The maximum size of the queue",
CONFIG_GROUP_CONNECTOR_CONFIG,
6,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString())

// ToDo: Add JVM Proxy
;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public enum ClickHouseSinkConnectorConfigVariables {
RESTART_EVENT_LOOP("restart.event.loop"),

RESTART_EVENT_LOOP_TIMEOUT_PERIOD("restart.event.loop.timeout.period.secs"),
JDBC_PARAMETERS("clickhouse.jdbc.params");
JDBC_PARAMETERS("clickhouse.jdbc.params"),

MAX_QUEUE_SIZE("sink.connector.max.queue.size");

private String label;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

/**
* <p>Creates sink service instance, takes records loaded from those
Expand All @@ -39,7 +36,7 @@ public ClickHouseSinkTask() {
private ClickHouseBatchExecutor executor;

// Records grouped by Topic Name
private ConcurrentLinkedQueue<List<ClickHouseStruct>> records;
private LinkedBlockingQueue<List<ClickHouseStruct>> records;

private DeDuplicator deduplicator;

Expand All @@ -66,7 +63,10 @@ public void start(Map<String, String> config) {

this.id = "task-" + this.config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString());

this.records = new ConcurrentLinkedQueue();
// check if the config is defined for MAX_QUEUE_SIZE
int maxQueueSize = this.config.getInt(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString());

this.records = new LinkedBlockingQueue<>(maxQueueSize);
ClickHouseBatchRunnable runnable = new ClickHouseBatchRunnable(this.records, this.config, topic2TableMap);
ThreadFactory namedThreadFactory =
new ThreadFactoryBuilder().setNameFormat("Sink Connector thread-pool-%d").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Runnable object that will be called on
Expand All @@ -35,7 +36,7 @@
*/
public class ClickHouseBatchRunnable implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ClickHouseBatchRunnable.class);
private final ConcurrentLinkedQueue<List<ClickHouseStruct>> records;
private final LinkedBlockingQueue<List<ClickHouseStruct>> records;

private final ClickHouseSinkConnectorConfig config;

Expand All @@ -57,7 +58,7 @@ public class ClickHouseBatchRunnable implements Runnable {

private List<ClickHouseStruct> currentBatch = null;

public ClickHouseBatchRunnable(ConcurrentLinkedQueue<List<ClickHouseStruct>> records,
public ClickHouseBatchRunnable(LinkedBlockingQueue<List<ClickHouseStruct>> records,
ClickHouseSinkConnectorConfig config,
Map<String, String> topic2TableMap) {
this.records = records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ClickHouseBatchRunnableTest {


ConcurrentLinkedQueue<List<ClickHouseStruct>> records = new ConcurrentLinkedQueue<>();
LinkedBlockingQueue<List<ClickHouseStruct>> records = new LinkedBlockingQueue<>();
Map<String, String> topic2TableMap = new HashMap<>();

@Before
Expand Down
Loading