Skip to content

Commit

Permalink
Changed from ConcurrentLinkedQueue to LinkedBlockedQueue with the siz…
Browse files Browse the repository at this point in the history
…e configurable
  • Loading branch information
subkanthi committed Mar 6, 2024
1 parent 9634c40 commit 65e6cea
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class DebeziumChangeEventCapture {
private ClickHouseBatchRunnable runnable;

// Records grouped by Topic Name
private ConcurrentLinkedQueue<List<ClickHouseStruct>> records = new ConcurrentLinkedQueue<>();
private LinkedBlockingQueue<List<ClickHouseStruct>> records;


private BaseDbWriter writer = null;
Expand Down Expand Up @@ -586,6 +586,13 @@ public void connectorStopped() {
public void setup(Properties props, DebeziumRecordParserService debeziumRecordParserService,
DDLParserService ddlParserService, boolean forceStart) throws IOException, ClassNotFoundException {

// Check if max queue size was defined by the user.
if(props.getProperty(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()) != null) {
int maxQueueSize = Integer.parseInt(props.getProperty(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()));
this.records = new LinkedBlockingQueue<>(maxQueueSize);
} else {
this.records = new LinkedBlockingQueue<>();
}

ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props));
Metrics.initialize(props.getProperty(ClickHouseSinkConnectorConfigVariables.ENABLE_METRICS.toString()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,19 @@ static ConfigDef newConfigDef() {
6,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.JDBC_PARAMETERS.toString())
// Define the max queue size.
.define(
ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString(),
Type.INT,
500000,
ConfigDef.Range.atLeast(1),
Importance.HIGH,
"The maximum size of the queue",
CONFIG_GROUP_CONNECTOR_CONFIG,
6,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString())

// ToDo: Add JVM Proxy
;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public enum ClickHouseSinkConnectorConfigVariables {
RESTART_EVENT_LOOP("restart.event.loop"),

RESTART_EVENT_LOOP_TIMEOUT_PERIOD("restart.event.loop.timeout.period.secs"),
JDBC_PARAMETERS("clickhouse.jdbc.params");
JDBC_PARAMETERS("clickhouse.jdbc.params"),

MAX_QUEUE_SIZE("max.queue.size");

private String label;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

/**
* <p>Creates sink service instance, takes records loaded from those
Expand All @@ -39,7 +36,7 @@ public ClickHouseSinkTask() {
private ClickHouseBatchExecutor executor;

// Records grouped by Topic Name
private ConcurrentLinkedQueue<List<ClickHouseStruct>> records;
private LinkedBlockingQueue<List<ClickHouseStruct>> records;

private DeDuplicator deduplicator;

Expand All @@ -66,7 +63,10 @@ public void start(Map<String, String> config) {

this.id = "task-" + this.config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString());

this.records = new ConcurrentLinkedQueue();
// check if the config is defined for MAX_QUEUE_SIZE
int maxQueueSize = this.config.getInt(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString());

this.records = new LinkedBlockingQueue<>(maxQueueSize);
ClickHouseBatchRunnable runnable = new ClickHouseBatchRunnable(this.records, this.config, topic2TableMap);
ThreadFactory namedThreadFactory =
new ThreadFactoryBuilder().setNameFormat("Sink Connector thread-pool-%d").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Runnable object that will be called on
Expand All @@ -35,7 +36,7 @@
*/
public class ClickHouseBatchRunnable implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ClickHouseBatchRunnable.class);
private final ConcurrentLinkedQueue<List<ClickHouseStruct>> records;
private final LinkedBlockingQueue<List<ClickHouseStruct>> records;

private final ClickHouseSinkConnectorConfig config;

Expand All @@ -57,7 +58,7 @@ public class ClickHouseBatchRunnable implements Runnable {

private List<ClickHouseStruct> currentBatch = null;

public ClickHouseBatchRunnable(ConcurrentLinkedQueue<List<ClickHouseStruct>> records,
public ClickHouseBatchRunnable(LinkedBlockingQueue<List<ClickHouseStruct>> records,
ClickHouseSinkConnectorConfig config,
Map<String, String> topic2TableMap) {
this.records = records;
Expand Down

2 comments on commit 65e6cea

@UsenPang
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @subkanthi, I noticed that you changed ConcurrentLinkedQueue to LinkedBlockingQueue in the ClickhouseSinkTask class. This is a method to prevent memory overflow. I have a question about the following code in the put method:
synchronized (this.records) {
this.records.add(batch);
}

Shouldn't it be changed to:

this.records.put(batch);
Otherwise, if MAX_QUEUE_SIZE is configured, the this.records.add(batch) method may throw an IllegalStateException and terminate the task.

@UsenPang
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or you can try throwing a retryable exception (RetryableException).

Please sign in to comment.