diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java index d178aa036..473dd5863 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java @@ -18,6 +18,8 @@ import java.sql.SQLException; import java.util.Properties; +import java.util.concurrent.ArrayBlockingQueue; + class KafkaCallback implements Callback { public static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class); @@ -53,7 +55,26 @@ public void onCompletion(RecordMetadata md, Exception e) { } } -public class MaxwellKafkaProducer extends AbstractAsyncProducer { + +public class MaxwellKafkaProducer extends AbstractProducer { + private final ArrayBlockingQueue queue; + private final MaxwellKafkaProducerWorker worker; + + public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) { + super(context); + this.queue = new ArrayBlockingQueue<>(100); + this.worker = new MaxwellKafkaProducerWorker(context, kafkaProperties, kafkaTopic, this.queue); + new Thread(this.worker, "maxwell-kafka-worker").start(); + + } + + @Override + public void push(RowMap r) throws Exception { + this.queue.put(r); + } +} + +class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable { static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class); private final KafkaProducer kafka; @@ -63,8 +84,9 @@ public class MaxwellKafkaProducer extends AbstractAsyncProducer { private final MaxwellKafkaPartitioner ddlPartitioner; private final KeyFormat keyFormat; private final boolean interpolateTopic; + private final ArrayBlockingQueue queue; - public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) { + public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic, ArrayBlockingQueue queue) { super(context); this.topic = kafkaTopic; @@ -87,6 +109,20 @@ public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, keyFormat = KeyFormat.HASH; else keyFormat = KeyFormat.ARRAY; + + this.queue = queue; + } + + @Override + public void run() { + while ( true ) { + try { + RowMap row = queue.take(); + this.push(row); + } catch ( Exception e ) { + throw new RuntimeException(e); + } + } } private Integer getNumPartitions(String topic) {