Skip to content

Commit

Permalink
Merge pull request #543 from zendesk/ben/kafka_producer_queue
Browse files Browse the repository at this point in the history
separate KafkaProducer#push from the main thread
  • Loading branch information
Ben Osheroff authored Jan 24, 2017
2 parents cfe8095 + 1267f3f commit 1b9e068
Showing 1 changed file with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;


class KafkaCallback implements Callback {
public static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);
Expand Down Expand Up @@ -53,7 +55,26 @@ public void onCompletion(RecordMetadata md, Exception e) {
}
}

public class MaxwellKafkaProducer extends AbstractAsyncProducer {

public class MaxwellKafkaProducer extends AbstractProducer {
private final ArrayBlockingQueue<RowMap> queue;
private final MaxwellKafkaProducerWorker worker;

public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) {
super(context);
this.queue = new ArrayBlockingQueue<>(100);
this.worker = new MaxwellKafkaProducerWorker(context, kafkaProperties, kafkaTopic, this.queue);
new Thread(this.worker, "maxwell-kafka-worker").start();

}

@Override
public void push(RowMap r) throws Exception {
this.queue.put(r);
}
}

class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable {
static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);

private final KafkaProducer<String, String> kafka;
Expand All @@ -63,8 +84,9 @@ public class MaxwellKafkaProducer extends AbstractAsyncProducer {
private final MaxwellKafkaPartitioner ddlPartitioner;
private final KeyFormat keyFormat;
private final boolean interpolateTopic;
private final ArrayBlockingQueue<RowMap> queue;

public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) {
public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic, ArrayBlockingQueue<RowMap> queue) {
super(context);

this.topic = kafkaTopic;
Expand All @@ -87,6 +109,20 @@ public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties,
keyFormat = KeyFormat.HASH;
else
keyFormat = KeyFormat.ARRAY;

this.queue = queue;
}

@Override
public void run() {
while ( true ) {
try {
RowMap row = queue.take();
this.push(row);
} catch ( Exception e ) {
throw new RuntimeException(e);
}
}
}

private Integer getNumPartitions(String topic) {
Expand Down

0 comments on commit 1b9e068

Please sign in to comment.