Skip to content

Commit 283ca09

Browse files
garyrussellartembilan
authored andcommitted
GH-63: Add AckMode.MANUAL_IMMEDIATE_SYNC
Resolves #63 Fixes #58 Add an option for synchronous acks.
1 parent 3c6a673 commit 283ca09

File tree

6 files changed

+104
-42
lines changed

6 files changed

+104
-42
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,12 @@ public enum AckMode {
8383
/**
8484
* Call {@link Consumer#commitAsync()} immediately for pending acks.
8585
*/
86-
MANUAL_IMMEDIATE
86+
MANUAL_IMMEDIATE,
87+
88+
/**
89+
* Call {@link Consumer#commitSync()} immediately for pending acks.
90+
*/
91+
MANUAL_IMMEDIATE_SYNC
8792

8893
}
8994

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ private class ListenerConsumer implements SchedulingAwareRunnable {
210210

211211
private final boolean autoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
212212

213+
private final AckMode ackMode = getAckMode();
214+
213215
private Thread consumerThread;
214216

215217
private volatile Collection<TopicPartition> definedPartitions;
@@ -222,9 +224,11 @@ private class ListenerConsumer implements SchedulingAwareRunnable {
222224

223225
ListenerConsumer(MessageListener<K, V> listener, AcknowledgingMessageListener<K, V> ackListener,
224226
long recentOffset) {
225-
Assert.state(!(getAckMode().equals(AckMode.MANUAL) || getAckMode().equals(AckMode.MANUAL_IMMEDIATE))
227+
Assert.state(!(this.ackMode.equals(AckMode.MANUAL)
228+
|| this.ackMode.equals(AckMode.MANUAL_IMMEDIATE)
229+
|| this.ackMode.equals(AckMode.MANUAL_IMMEDIATE_SYNC))
226230
|| !this.autoCommit,
227-
"Consumer cannot be configured for auto commit for ackMode " + getAckMode());
231+
"Consumer cannot be configured for auto commit for ackMode " + this.ackMode);
228232
Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
229233
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
230234

@@ -272,7 +276,6 @@ public void run() {
272276
if (isRunning() && this.definedPartitions != null) {
273277
initPartitionsIfNeeded();
274278
}
275-
final AckMode ackMode = getAckMode();
276279
while (isRunning()) {
277280
try {
278281
if (this.logger.isTraceEnabled()) {
@@ -287,14 +290,14 @@ public void run() {
287290
while (iterator.hasNext()) {
288291
final ConsumerRecord<K, V> record = iterator.next();
289292
invokeListener(record);
290-
if (!this.autoCommit && ackMode.equals(AckMode.RECORD)) {
293+
if (!this.autoCommit && this.ackMode.equals(AckMode.RECORD)) {
291294
this.consumer.commitAsync(
292295
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
293296
new OffsetAndMetadata(record.offset() + 1)), this.callback);
294297
}
295298
}
296299
if (!this.autoCommit) {
297-
processCommits(ackMode, records);
300+
processCommits(this.ackMode, records);
298301
}
299302
}
300303
else {
@@ -337,28 +340,39 @@ private void invokeListener(final ConsumerRecord<K, V> record) {
337340

338341
@Override
339342
public void acknowledge() {
340-
if (getAckMode().equals(AckMode.MANUAL)) {
343+
if (ListenerConsumer.this.ackMode.equals(AckMode.MANUAL)) {
341344
updateManualOffset(record);
342345
}
343-
else if (getAckMode().equals(AckMode.MANUAL_IMMEDIATE)) {
344-
if (Thread.currentThread().equals(ListenerConsumer.this.consumerThread)) {
345-
Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(
346-
new TopicPartition(record.topic(), record.partition()),
347-
new OffsetAndMetadata(record.offset() + 1));
348-
if (ListenerConsumer.this.logger.isDebugEnabled()) {
349-
ListenerConsumer.this.logger.debug("Committing: " + commits);
350-
}
351-
ListenerConsumer.this.consumer.commitAsync(commits, ListenerConsumer.this.callback);
346+
else if (ListenerConsumer.this.ackMode.equals(AckMode.MANUAL_IMMEDIATE)
347+
|| ListenerConsumer.this.ackMode.equals(AckMode.MANUAL_IMMEDIATE_SYNC)) {
348+
ackImmediate(record);
349+
}
350+
else {
351+
throw new IllegalStateException("AckMode must be MANUAL or MANUAL_IMMEDIATE "
352+
+ "for manual acks");
353+
}
354+
}
355+
356+
private void ackImmediate(final ConsumerRecord<K, V> record) {
357+
if (Thread.currentThread().equals(ListenerConsumer.this.consumerThread)) {
358+
Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(
359+
new TopicPartition(record.topic(), record.partition()),
360+
new OffsetAndMetadata(record.offset() + 1));
361+
if (ListenerConsumer.this.logger.isDebugEnabled()) {
362+
ListenerConsumer.this.logger.debug("Committing: " + commits);
363+
}
364+
if (ListenerConsumer.this.ackMode.equals(AckMode.MANUAL_IMMEDIATE)) {
365+
ListenerConsumer.this.consumer.commitAsync(commits,
366+
ListenerConsumer.this.callback);
352367
}
353368
else {
354-
throw new IllegalStateException(
355-
"With MANUAL_IMMEDIATE ack mode, acknowledge() must be invoked on the "
356-
+ "consumer thread");
369+
ListenerConsumer.this.consumer.commitSync(commits);
357370
}
358371
}
359372
else {
360-
throw new IllegalStateException("AckMode must be MANUAL or MANUAL_IMMEDIATE "
361-
+ "for manual acks");
373+
throw new IllegalStateException(
374+
"With " + ListenerConsumer.this.ackMode.name()
375+
+ " ack mode, acknowledge() must be invoked on the consumer thread");
362376
}
363377
}
364378

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,11 @@ public class ConcurrentMessageListenerContainerTests {
7474

7575
private static String topic7 = "testTopic7";
7676

77+
private static String topic8 = "testTopic8";
78+
7779
@ClassRule
7880
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5,
79-
topic6, topic7);
81+
topic6, topic7, topic8);
8082

8183
@Test
8284
public void testAutoCommit() throws Exception {
@@ -357,6 +359,48 @@ public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ac
357359
logger.info("Stop MANUAL_IMMEDIATE with Existing");
358360
}
359361

362+
@Test
363+
public void testManualCommitSyncExisting() throws Exception {
364+
logger.info("Start MANUAL_IMMEDIATE_SYNC with Existing");
365+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
366+
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
367+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
368+
template.setDefaultTopic(topic8);
369+
template.send(0, "foo");
370+
template.send(2, "bar");
371+
template.send(0, "baz");
372+
template.send(2, "qux");
373+
template.flush();
374+
Map<String, Object> props = KafkaTestUtils.consumerProps("testManualExistingSync", "false", embeddedKafka);
375+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
376+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
377+
ConcurrentMessageListenerContainer<Integer, String> container =
378+
new ConcurrentMessageListenerContainer<>(cf, topic8);
379+
final CountDownLatch latch = new CountDownLatch(8);
380+
container.setMessageListener(new AcknowledgingMessageListener<Integer, String>() {
381+
382+
@Override
383+
public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ack) {
384+
logger.info("manualExisting: " + message);
385+
ack.acknowledge();
386+
latch.countDown();
387+
}
388+
389+
});
390+
container.setConcurrency(1);
391+
container.setAckMode(AckMode.MANUAL_IMMEDIATE_SYNC);
392+
container.setBeanName("testManualExisting");
393+
container.start();
394+
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
395+
template.send(0, "fooo");
396+
template.send(2, "barr");
397+
template.send(0, "bazz");
398+
template.send(2, "quxx");
399+
template.flush();
400+
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
401+
container.stop();
402+
logger.info("Stop MANUAL_IMMEDIATE_SYNC with Existing");
403+
}
360404

361405
@SuppressWarnings("unchecked")
362406
@Test

src/reference/asciidoc/kafka.adoc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,13 @@ records have been received since the last commit.
202202
- COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true.
203203
- MANUAL - the message listener (`AcknowledgingMessageListener`) is responsible to `acknowledge()` the `Acknowledgment`;
204204
after which, the same semantics as `COUNT_TIME` are applied.
205-
- MANUAL_IMMEDIATE - call `commitAsync()`` immediately when the `Acknowledgment.acknowledge()` method is called by the
205+
- MANUAL_IMMEDIATE - call `commitAsync()` immediately when the `Acknowledgment.acknowledge()` method is called by the
206206
listener - must be executed on the container's thread.
207+
- MANUAL_IMMEDIATE_SYNC - call `commitSync()` immediately when the `Acknowledgment.acknowledge()` method is called by
208+
the listener - must be executed on the container's thread.
207209

208-
NOTE: `MANUAL` and `MANUAL_IMMEDIATE` require the listener to be an `AcknowledgingMessageListener`.
210+
NOTE: `MANUAL`, `MANUAL_IMMEDIATE`, and `MANUAL_IMMEDIATE_SYNC` require the listener to be an
211+
`AcknowledgingMessageListener`.
209212

210213
[source, java]
211214
----

src/reference/asciidoc/quick-tour.adoc

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@ public void testAutoCommit() throws Exception {
5959
Thread.sleep(1000); // wait a bit for the container to start
6060
KafkaTemplate<Integer, String> template = createTemplate();
6161
template.setDefaultTopic(topic1);
62-
template.convertAndSend(0, "foo");
63-
template.convertAndSend(2, "bar");
64-
template.convertAndSend(0, "baz");
65-
template.convertAndSend(2, "qux");
62+
template.send(0, "foo");
63+
template.send(2, "bar");
64+
template.send(0, "baz");
65+
template.send(2, "qux");
6666
template.flush();
6767
assertTrue(latch.await(60, TimeUnit.SECONDS));
6868
container.stop();
@@ -94,10 +94,8 @@ private Map<String, Object> consumerProps() {
9494
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
9595
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
9696
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
97-
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
98-
"org.apache.kafka.common.serialization.IntegerDeserializer");
99-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
100-
"org.apache.kafka.common.serialization.StringDeserializer");
97+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
98+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
10199
return props;
102100
}
103101
@@ -108,10 +106,8 @@ private Map<String, Object> senderProps() {
108106
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
109107
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
110108
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
111-
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
112-
"org.apache.kafka.common.serialization.IntegerSerializer");
113-
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
114-
"org.apache.kafka.common.serialization.StringSerializer");
109+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
110+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
115111
return props;
116112
}
117113
----
@@ -131,7 +127,7 @@ private KafkaTemplate<Integer, String> template;
131127
@Test
132128
public void testSimple() throws Exception {
133129
waitListening("foo");
134-
template.convertAndSend("annotated1", 0, "foo");
130+
template.send("annotated1", 0, "foo");
135131
assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));
136132
}
137133

src/reference/asciidoc/testing.adoc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,14 +184,14 @@ public class KafkaTemplateTests {
184184
new DefaultKafkaProducerFactory<Integer, String>(senderProps);
185185
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
186186
template.setDefaultTopic(TEMPLATE_TOPIC);
187-
template.syncConvertAndSend("foo");
187+
template.send("foo");
188188
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
189-
template.syncConvertAndSend(0, 2, "bar");
189+
template.send(0, 2, "bar");
190190
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
191191
assertThat(received, hasKey(2));
192192
assertThat(received, hasPartition(0));
193193
assertThat(received, hasValue("bar"));
194-
template.syncConvertAndSend(TEMPLATE_TOPIC, 0, 2, "baz");
194+
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
195195
received = records.poll(10, TimeUnit.SECONDS);
196196
assertThat(received, hasKey(2));
197197
assertThat(received, hasPartition(0));
@@ -207,12 +207,12 @@ The above uses the hamcrest matchers; with `AssertJ`, the final part looks like
207207
----
208208
...
209209
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
210-
template.syncConvertAndSend(0, 2, "bar");
210+
template.send(0, 2, "bar");
211211
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
212212
assertThat(received).has(key(2));
213213
assertThat(received).has(partition(0));
214214
assertThat(received).has(value("bar"));
215-
template.syncConvertAndSend(TEMPLATE_TOPIC, 0, 2, "baz");
215+
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
216216
received = records.poll(10, TimeUnit.SECONDS);
217217
assertThat(received).has(key(2));
218218
assertThat(received).has(partition(0));

0 commit comments

Comments
 (0)