From a3543c7dfd90a1a17432a6ed4779751736642f38 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Sun, 22 Jan 2017 14:56:43 -0800 Subject: [PATCH 1/2] separate KafkaProducer#push from the main thread This should be worth roughly 25% of cpu-concurrency, by running binlog-event-to-RowMap code on a different thread than the json-marshalling and kafka-data-compression/batching. We could go deeper than this by also parceling out the json-marshal calls into a separate thread, but at that point I think I'd adopt rx-java or some kind of pipeline framework. --- .../producer/MaxwellKafkaProducer.java | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java index d178aa036..6e077cbd8 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java @@ -18,6 +18,8 @@ import java.sql.SQLException; import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; + class KafkaCallback implements Callback { public static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class); @@ -53,7 +55,26 @@ public void onCompletion(RecordMetadata md, Exception e) { } } -public class MaxwellKafkaProducer extends AbstractAsyncProducer { + +public class MaxwellKafkaProducer extends AbstractProducer { + private final LinkedBlockingQueue queue; + private final MaxwellKafkaProducerWorker worker; + + public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) { + super(context); + this.queue = new LinkedBlockingQueue<>(100); + this.worker = new MaxwellKafkaProducerWorker(context, kafkaProperties, kafkaTopic, this.queue); + new Thread(this.worker, "maxwell-kafka-producer").start(); + + } + + @Override + public void push(RowMap r) throws Exception { + this.queue.put(r); + } +} + +class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable { static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class); private final KafkaProducer kafka; @@ -63,8 +84,9 @@ public class MaxwellKafkaProducer extends AbstractAsyncProducer { private final MaxwellKafkaPartitioner ddlPartitioner; private final KeyFormat keyFormat; private final boolean interpolateTopic; + private final LinkedBlockingQueue queue; - public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) { + public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic, LinkedBlockingQueue queue) { super(context); this.topic = kafkaTopic; @@ -87,6 +109,20 @@ public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, keyFormat = KeyFormat.HASH; else keyFormat = KeyFormat.ARRAY; + + this.queue = queue; + } + + @Override + public void run() { + while ( true ) { + try { + RowMap row = queue.take(); + this.push(row); + } catch ( Exception e ) { + throw new RuntimeException(e); + } + } } private Integer getNumPartitions(String topic) { From 1267f3fd1392fe5e1ff86d0fb52eeb761c684ffa Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Mon, 23 Jan 2017 13:23:11 -0800 Subject: [PATCH 2/2] CR: rename thread, Linked -> Array BlockingQueue --- .../maxwell/producer/MaxwellKafkaProducer.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java index 6e077cbd8..473dd5863 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java @@ -18,7 +18,7 @@ import java.sql.SQLException; import java.util.Properties; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; class KafkaCallback implements Callback { @@ -57,14 +57,14 @@ public void onCompletion(RecordMetadata md, Exception e) { public class MaxwellKafkaProducer extends AbstractProducer { - private final LinkedBlockingQueue queue; + private final ArrayBlockingQueue queue; private final MaxwellKafkaProducerWorker worker; public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) { super(context); - this.queue = new LinkedBlockingQueue<>(100); + this.queue = new ArrayBlockingQueue<>(100); this.worker = new MaxwellKafkaProducerWorker(context, kafkaProperties, kafkaTopic, this.queue); - new Thread(this.worker, "maxwell-kafka-producer").start(); + new Thread(this.worker, "maxwell-kafka-worker").start(); } @@ -84,9 +84,9 @@ class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnab private final MaxwellKafkaPartitioner ddlPartitioner; private final KeyFormat keyFormat; private final boolean interpolateTopic; - private final LinkedBlockingQueue queue; + private final ArrayBlockingQueue queue; - public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic, LinkedBlockingQueue queue) { + public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic, ArrayBlockingQueue queue) { super(context); this.topic = kafkaTopic;