Skip to content

Commit 61eb968

Browse files
committed
Add per-listener ackMode attribute to @KafkaListener
Allows individual Kafka listeners to specify different acknowledgment modes without creating multiple container factory beans. This addresses the need to handle different reliability requirements across listeners in the same application (e.g., critical transactions with manual acknowledgment, notifications with batch acknowledgment, and analytics with record acknowledgment). Key changes: - Added ackMode() attribute to @KafkaListener annotation supporting all ContainerProperties.AckMode values (RECORD, BATCH, TIME, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE) - Supports SpEL expressions and property placeholders - Endpoint-level ackMode overrides factory default when specified - Added resolveAckMode() to KafkaListenerAnnotationBeanPostProcessor to process the annotation attribute - Updated endpoint infrastructure (KafkaListenerEndpoint, AbstractKafkaListenerEndpoint, AbstractKafkaListenerContainerFactory) to store and apply ackMode - Added comprehensive tests in KafkaListenerAckModeTests Example usage: @KafkaListener(topics = "critical", ackMode = "MANUAL") @KafkaListener(topics = "notifications", ackMode = "BATCH") Fixes GH-4174 Signed-off-by: gobeomjun <alap_u@naver.com>
1 parent 32fa9fc commit 61eb968

File tree

6 files changed

+222
-0
lines changed

6 files changed

+222
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,4 +338,14 @@
338338
* @since 3.1
339339
*/
340340
String containerPostProcessor() default "";
341+
342+
/**
343+
* Override the container factory's default {@code ackMode} for this listener.
344+
* <p>
345+
* Supports SpEL {@code #{...}} and property placeholders {@code ${...}}.
346+
* @return the ack mode (case-insensitive), or empty string to use factory default.
347+
* @since 4.1
348+
* @see org.springframework.kafka.listener.ContainerProperties.AckMode
349+
*/
350+
String ackMode() default "";
341351
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,7 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> en
693693
resolveErrorHandler(endpoint, kafkaListener);
694694
resolveContentTypeConverter(endpoint, kafkaListener);
695695
resolveFilter(endpoint, kafkaListener);
696+
resolveAckMode(endpoint, kafkaListener);
696697
resolveContainerPostProcessor(endpoint, kafkaListener);
697698
}
698699

@@ -740,6 +741,16 @@ private void resolveFilter(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaList
740741
}
741742
}
742743

744+
private void resolveAckMode(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
745+
String ackMode = kafkaListener.ackMode();
746+
if (StringUtils.hasText(ackMode)) {
747+
String ackModeValue = resolveExpressionAsString(ackMode, "ackMode");
748+
if (StringUtils.hasText(ackModeValue)) {
749+
endpoint.setAckMode(ackModeValue);
750+
}
751+
}
752+
}
753+
743754
private @Nullable KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListener kafkaListener,
744755
@Nullable Object factoryTarget, String beanName) {
745756

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,12 @@ else if (this.autoStartup != null) {
457457
.acceptIfNotNull(endpoint.getConsumerProperties(),
458458
instance.getContainerProperties()::setKafkaConsumerProperties)
459459
.acceptIfNotNull(endpoint.getListenerInfo(), instance::setListenerInfo);
460+
461+
// Set ackMode if specified in endpoint (overrides factory default)
462+
String ackMode = endpoint.getAckMode();
463+
if (ackMode != null) {
464+
properties.setAckMode(ContainerProperties.AckMode.valueOf(ackMode.toUpperCase()));
465+
}
460466
}
461467

462468
@Override

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
125125
@Nullable
126126
private String mainListenerId;
127127

128+
private @Nullable String ackMode;
129+
128130
@Override
129131
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
130132
this.beanFactory = beanFactory;
@@ -387,6 +389,25 @@ public void setAutoStartup(@Nullable Boolean autoStartup) {
387389
this.autoStartup = autoStartup;
388390
}
389391

392+
/**
393+
* Return the ackMode for this endpoint's container.
394+
* @return the ackMode.
395+
* @since 4.1
396+
*/
397+
public @Nullable String getAckMode() {
398+
return this.ackMode;
399+
}
400+
401+
/**
402+
* Set the ackMode for this endpoint's container to override the factory's default.
403+
* @param ackMode the ackMode string (case-insensitive).
404+
* @since 4.1
405+
* @see org.springframework.kafka.listener.ContainerProperties.AckMode
406+
*/
407+
public void setAckMode(@Nullable String ackMode) {
408+
this.ackMode = ackMode;
409+
}
410+
390411
/**
391412
* Set a configurer which will be invoked when creating a reply message.
392413
* @param replyHeadersConfigurer the configurer.

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,4 +187,15 @@ default Boolean getBatchListener() {
187187
return null;
188188
}
189189

190+
/**
191+
* Return the ackMode for this endpoint, or null if not explicitly set.
192+
* @return the ack mode string.
193+
* @since 4.1
194+
* @see org.springframework.kafka.listener.ContainerProperties.AckMode
195+
*/
196+
@Nullable
197+
default String getAckMode() {
198+
return null;
199+
}
200+
190201
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.annotation;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import org.apache.kafka.clients.consumer.ConsumerConfig;
25+
import org.apache.kafka.common.serialization.IntegerDeserializer;
26+
import org.apache.kafka.common.serialization.StringDeserializer;
27+
import org.junit.jupiter.api.Test;
28+
29+
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
33+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
34+
import org.springframework.kafka.core.ConsumerFactory;
35+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
36+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
37+
import org.springframework.kafka.core.KafkaTemplate;
38+
import org.springframework.kafka.core.ProducerFactory;
39+
import org.springframework.kafka.listener.ContainerProperties;
40+
import org.springframework.kafka.listener.MessageListenerContainer;
41+
import org.springframework.kafka.support.Acknowledgment;
42+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
43+
import org.springframework.kafka.test.context.EmbeddedKafka;
44+
import org.springframework.kafka.test.utils.KafkaTestUtils;
45+
import org.springframework.test.annotation.DirtiesContext;
46+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
47+
48+
import static org.assertj.core.api.Assertions.assertThat;
49+
50+
/**
51+
* Tests for {@link KafkaListener} ackMode attribute.
52+
*
53+
* @author GO BEOMJUN
54+
* @since 4.1
55+
*/
56+
@SpringJUnitConfig
57+
@DirtiesContext
58+
@EmbeddedKafka(topics = {"ackModeRecord", "ackModeManual", "ackModeDefault"}, partitions = 1)
59+
public class KafkaListenerAckModeTests {
60+
61+
@Autowired
62+
private KafkaTemplate<Integer, String> template;
63+
64+
@Autowired
65+
private Config config;
66+
67+
@Autowired
68+
private KafkaListenerEndpointRegistry registry;
69+
70+
@Test
71+
void testAckModeRecordOverride() throws Exception {
72+
this.template.send("ackModeRecord", "test-record");
73+
assertThat(this.config.recordLatch.await(10, TimeUnit.SECONDS)).isTrue();
74+
75+
// Verify that the listener container has the correct ack mode
76+
MessageListenerContainer container = this.registry.getListenerContainer("ackModeRecordListener");
77+
assertThat(container).isNotNull();
78+
assertThat(container.getContainerProperties().getAckMode())
79+
.isEqualTo(ContainerProperties.AckMode.RECORD);
80+
}
81+
82+
@Test
83+
void testAckModeManualOverride() throws Exception {
84+
this.template.send("ackModeManual", "test-manual");
85+
assertThat(this.config.manualLatch.await(10, TimeUnit.SECONDS)).isTrue();
86+
87+
// Verify that the listener container has the correct ack mode
88+
MessageListenerContainer container = this.registry.getListenerContainer("ackModeManualListener");
89+
assertThat(container).isNotNull();
90+
assertThat(container.getContainerProperties().getAckMode())
91+
.isEqualTo(ContainerProperties.AckMode.MANUAL);
92+
}
93+
94+
@Test
95+
void testAckModeDefault() throws Exception {
96+
this.template.send("ackModeDefault", "test-default");
97+
assertThat(this.config.defaultLatch.await(10, TimeUnit.SECONDS)).isTrue();
98+
99+
// Verify that the listener container uses factory default (BATCH)
100+
MessageListenerContainer container = this.registry.getListenerContainer("ackModeDefaultListener");
101+
assertThat(container).isNotNull();
102+
assertThat(container.getContainerProperties().getAckMode())
103+
.isEqualTo(ContainerProperties.AckMode.BATCH);
104+
}
105+
106+
@Configuration
107+
@EnableKafka
108+
public static class Config {
109+
110+
final CountDownLatch recordLatch = new CountDownLatch(1);
111+
112+
final CountDownLatch manualLatch = new CountDownLatch(1);
113+
114+
final CountDownLatch defaultLatch = new CountDownLatch(1);
115+
116+
@Bean
117+
public ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
118+
Map<String, Object> consumerProps = new HashMap<>(KafkaTestUtils.consumerProps(broker, "testGroup", false));
119+
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
120+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
121+
return new DefaultKafkaConsumerFactory<>(consumerProps);
122+
}
123+
124+
@Bean
125+
public ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
126+
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(broker));
127+
}
128+
129+
@Bean
130+
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
131+
return new KafkaTemplate<>(producerFactory);
132+
}
133+
134+
@Bean
135+
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
136+
ConsumerFactory<Integer, String> consumerFactory) {
137+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
138+
new ConcurrentKafkaListenerContainerFactory<>();
139+
factory.setConsumerFactory(consumerFactory);
140+
// Set factory default to BATCH
141+
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
142+
return factory;
143+
}
144+
145+
@KafkaListener(id = "ackModeRecordListener", topics = "ackModeRecord", ackMode = "RECORD")
146+
public void listenWithRecordAck(String message) {
147+
this.recordLatch.countDown();
148+
}
149+
150+
@KafkaListener(id = "ackModeManualListener", topics = "ackModeManual", ackMode = "MANUAL")
151+
public void listenWithManualAck(String message, Acknowledgment ack) {
152+
ack.acknowledge();
153+
this.manualLatch.countDown();
154+
}
155+
156+
@KafkaListener(id = "ackModeDefaultListener", topics = "ackModeDefault")
157+
public void listenWithDefaultAck(String message) {
158+
this.defaultLatch.countDown();
159+
}
160+
161+
}
162+
163+
}

0 commit comments

Comments
 (0)