Skip to content

Commit 8a099ec

Browse files
committed
Polishing - ackDiscarded on @KafkaListener
1 parent 32b59d8 commit 8a099ec

File tree

6 files changed

+70
-30
lines changed

6 files changed

+70
-30
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
5353

5454
private RecordFilterStrategy<K, V> recordFilterStrategy;
5555

56+
private Boolean ackDiscarded;
57+
5658
private ApplicationEventPublisher applicationEventPublisher;
5759

5860
/**
@@ -94,13 +96,21 @@ public void setMessageConverter(MessageConverter messageConverter) {
9496
}
9597

9698
/**
97-
* Set the de-duplication strategy.
99+
* Set the record filter strategy.
98100
* @param recordFilterStrategy the strategy.
99101
*/
100102
public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
101103
this.recordFilterStrategy = recordFilterStrategy;
102104
}
103105

106+
/**
107+
* Set to true to ack discards when a filter strategy is in use.
108+
* @param ackDiscarded the ackDiscarded.
109+
*/
110+
public void setAckDiscarded(Boolean ackDiscarded) {
111+
this.ackDiscarded = ackDiscarded;
112+
}
113+
104114
@Override
105115
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
106116
this.applicationEventPublisher = applicationEventPublisher;
@@ -133,8 +143,13 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
133143
instance.setBeanName(endpoint.getId());
134144
}
135145

136-
if (this.recordFilterStrategy != null && endpoint instanceof AbstractKafkaListenerEndpoint) {
137-
((AbstractKafkaListenerEndpoint<K, V>) endpoint).setRecordFilterStrategy(this.recordFilterStrategy);
146+
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
147+
if (this.recordFilterStrategy != null) {
148+
((AbstractKafkaListenerEndpoint<K, V>) endpoint).setRecordFilterStrategy(this.recordFilterStrategy);
149+
}
150+
if (this.ackDiscarded != null) {
151+
((AbstractKafkaListenerEndpoint<K, V>) endpoint).setAckDiscarded(this.ackDiscarded);
152+
}
138153
}
139154

140155
endpoint.setupListenerContainer(instance, this.messageConverter);

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@
3131
import org.springframework.beans.factory.config.BeanExpressionContext;
3232
import org.springframework.beans.factory.config.BeanExpressionResolver;
3333
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
34+
import org.springframework.kafka.listener.AcknowledgingMessageListener;
3435
import org.springframework.kafka.listener.MessageListener;
3536
import org.springframework.kafka.listener.MessageListenerContainer;
37+
import org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter;
38+
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
3639
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
3740
import org.springframework.kafka.support.converter.MessageConverter;
3841
import org.springframework.util.Assert;
@@ -70,6 +73,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
7073

7174
private RecordFilterStrategy<K, V> recordFilterStrategy;
7275

76+
private boolean ackDiscarded;
77+
7378

7479
@Override
7580
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
@@ -208,6 +213,19 @@ public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrat
208213
this.recordFilterStrategy = recordFilterStrategy;
209214
}
210215

216+
protected boolean isAckDiscarded() {
217+
return this.ackDiscarded;
218+
}
219+
220+
/**
221+
* Set to true if the {@link #setRecordFilterStrategy(RecordFilterStrategy) recordFilterStrategy}
222+
* is in use.
223+
* @param ackDiscarded the ackDiscarded.
224+
*/
225+
public void setAckDiscarded(boolean ackDiscarded) {
226+
this.ackDiscarded = ackDiscarded;
227+
}
228+
211229
@Override
212230
public void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter) {
213231
setupMessageListener(listenerContainer, messageConverter);
@@ -226,7 +244,21 @@ protected abstract MessageListener<K, V> createMessageListener(MessageListenerCo
226244
private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {
227245
MessageListener<K, V> messageListener = createMessageListener(container, messageConverter);
228246
Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
229-
container.setupMessageListener(messageListener);
247+
if (this.recordFilterStrategy != null) {
248+
if (messageListener instanceof AcknowledgingMessageListener) {
249+
@SuppressWarnings("unchecked")
250+
AcknowledgingMessageListener<K, V> aml = (AcknowledgingMessageListener<K, V>) messageListener;
251+
aml = new FilteringAcknowledgingMessageListenerAdapter<>(this.recordFilterStrategy, aml, this.ackDiscarded);
252+
container.setupMessageListener(aml);
253+
}
254+
else {
255+
messageListener = new FilteringMessageListenerAdapter<>(this.recordFilterStrategy, messageListener);
256+
container.setupMessageListener(messageListener);
257+
}
258+
}
259+
else {
260+
container.setupMessageListener(messageListener);
261+
}
230262
}
231263

232264
/**

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,6 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageLis
9494
Assert.state(this.messageHandlerMethodFactory != null,
9595
"Could not create message listener - MessageHandlerMethodFactory not set");
9696
MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance();
97-
if (getRecordFilterStrategy() != null) {
98-
messageListener.setRecordFilterStrategy(getRecordFilterStrategy());
99-
}
10097
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
10198
if (messageConverter != null) {
10299
messageListener.setMessageConverter(messageConverter);

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ public class MessagingMessageListenerAdapter<K, V> extends AbstractAdaptableMess
5959

6060
private MessageConverter messageConverter = new MessagingMessageConverter();
6161

62-
private RecordFilterStrategy<K, V> recordFilterStrategy;
63-
6462

6563
public MessagingMessageListenerAdapter(Method method) {
6664
this.inferredType = determineInferredType(method);
@@ -103,18 +101,6 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
103101
invokeHandler(record, acknowledgment, message);
104102
}
105103

106-
protected RecordFilterStrategy<K, V> getRecordFilterStrategy() {
107-
return this.recordFilterStrategy;
108-
}
109-
110-
/**
111-
* Set a {@link RecordFilterStrategy} implementation.
112-
* @param recordFilterStrategy the strategy implementation.
113-
*/
114-
public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
115-
this.recordFilterStrategy = recordFilterStrategy;
116-
}
117-
118104
protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment) {
119105
return getMessageConverter().toMessage(record, acknowledgment, this.inferredType);
120106
}
@@ -128,9 +114,6 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, Acknowledgm
128114
* @return the result of invocation.
129115
*/
130116
private Object invokeHandler(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Message<?> message) {
131-
if (this.recordFilterStrategy != null && this.recordFilterStrategy.filter(record)) {
132-
return null;
133-
}
134117
try {
135118
return this.handlerMethod.invoke(message, record, acknowledgment);
136119
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
5151
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
5252
import org.springframework.kafka.listener.MessageListenerContainer;
53+
import org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter;
5354
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
5455
import org.springframework.kafka.listener.config.ContainerProperties;
5556
import org.springframework.kafka.support.Acknowledgment;
@@ -128,6 +129,11 @@ public void testSimple() throws Exception {
128129
assertThat(this.listener.ack).isNotNull();
129130
assertThat(this.listener.eventLatch.await(20, TimeUnit.SECONDS)).isTrue();
130131
assertThat(this.listener.event.getListenerId().startsWith("qux-"));
132+
MessageListenerContainer manualContainer = this.registry.getListenerContainer("qux");
133+
assertThat(KafkaTestUtils.getPropertyValue(manualContainer, "containerProperties.messageListener"))
134+
.isInstanceOf(FilteringAcknowledgingMessageListenerAdapter.class);
135+
assertThat(KafkaTestUtils.getPropertyValue(manualContainer, "containerProperties.messageListener.ackDiscarded",
136+
Boolean.class)).isTrue();
131137

132138
template.send("annotated5", 0, 0, "foo");
133139
template.send("annotated5", 1, 0, "bar");
@@ -214,12 +220,17 @@ public PlatformTransactionManager transactionManager() {
214220
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
215221
new ConcurrentKafkaListenerContainerFactory<>();
216222
factory.setConsumerFactory(consumerFactory());
217-
factory.setRecordFilterStrategy(filter());
223+
factory.setRecordFilterStrategy(recordFilter());
218224
return factory;
219225
}
220226

221227
@Bean
222-
public RecordFilterImpl filter() {
228+
public RecordFilterImpl recordFilter() {
229+
return new RecordFilterImpl();
230+
}
231+
232+
@Bean
233+
public RecordFilterImpl manualFilter() {
223234
return new RecordFilterImpl();
224235
}
225236

@@ -241,6 +252,8 @@ public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
241252
ContainerProperties props = factory.getContainerProperties();
242253
props.setAckMode(AckMode.MANUAL_IMMEDIATE);
243254
props.setIdleEventInterval(100L);
255+
factory.setRecordFilterStrategy(manualFilter());
256+
factory.setAckDiscarded(true);
244257
return factory;
245258
}
246259

src/reference/asciidoc/kafka.adoc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -336,19 +336,19 @@ The framework cannot know whether such a message has been processed or not, that
336336
function.
337337
This is known as the http://www.enterpriseintegrationpatterns.com/patterns/messaging/IdempotentReceiver.html[Idempotent
338338
Receiver] pattern and Spring Integration provides an
339-
http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#idempotent-receiver
340-
[implementation thereof].
339+
http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#idempotent-receiver[implementation thereof].
341340

342341
The Spring for Apache Kafka project also provides some assistance by means of the `FilteringMessageListenerAdapter`
343342
classe, which can wrap your `MessageListener`.
344343
This class takes an implementation of `RecordFilterStrategy` where you implement the `filter` method to signal
345344
that a message is a duplicate and should be discarded.
346345

347-
Similarly, when using `@KafkaListener`, the `RecordFilterStrategy` can be injected into the container factory.
348-
349346
A `FilteringAcknowledgingMessageListenerAdapter` is also provided for wrapping an `AcknowledgingMessageListener`.
350347
This has an additional property `ackDiscarded` which indicates whether the adapter should acknowledge the discarded record.
351348

349+
When using `@KafkaListener`, set the `RecordFilterStrategy` (and optionally `ackDiscarded`) on the container factory and the listener will be wrapped in the appropriate filtering adapter.
350+
351+
352352
==== Serialization/Deserialization and Message Conversion
353353

354354
Apache Kafka provides a high-level API for serializing/deserializing record values as well as their keys.

0 commit comments

Comments
 (0)