Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.adapter.DeDuplicationStrategy;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.converter.MessageConverter;

Expand Down Expand Up @@ -51,7 +51,9 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe

private MessageConverter messageConverter;

private DeDuplicationStrategy<K, V> deDuplicationStrategy;
private RecordFilterStrategy<K, V> recordFilterStrategy;

private Boolean ackDiscarded;

private ApplicationEventPublisher applicationEventPublisher;

Expand Down Expand Up @@ -94,11 +96,19 @@ public void setMessageConverter(MessageConverter messageConverter) {
}

/**
* Set the de-duplication strategy.
* @param deDuplicationStrategy the strategy.
* Set the record filter strategy.
* @param recordFilterStrategy the strategy.
*/
public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
this.recordFilterStrategy = recordFilterStrategy;
}

/**
* Set to true to ack discards when a filter strategy is in use.
* @param ackDiscarded the ackDiscarded.
*/
public void setDeDuplicationStrategy(DeDuplicationStrategy<K, V> deDuplicationStrategy) {
this.deDuplicationStrategy = deDuplicationStrategy;
public void setAckDiscarded(Boolean ackDiscarded) {
this.ackDiscarded = ackDiscarded;
}

@Override
Expand Down Expand Up @@ -133,8 +143,13 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
instance.setBeanName(endpoint.getId());
}

if (this.deDuplicationStrategy != null && endpoint instanceof AbstractKafkaListenerEndpoint) {
((AbstractKafkaListenerEndpoint<K, V>) endpoint).setDeDuplicationStrategy(this.deDuplicationStrategy);
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
if (this.recordFilterStrategy != null) {
((AbstractKafkaListenerEndpoint<K, V>) endpoint).setRecordFilterStrategy(this.recordFilterStrategy);
}
if (this.ackDiscarded != null) {
((AbstractKafkaListenerEndpoint<K, V>) endpoint).setAckDiscarded(this.ackDiscarded);
}
}

endpoint.setupListenerContainer(instance, this.messageConverter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.DeDuplicationStrategy;
import org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -68,7 +71,9 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>

private String group;

private DeDuplicationStrategy<K, V> deDuplicationStrategy;
private RecordFilterStrategy<K, V> recordFilterStrategy;

private boolean ackDiscarded;


@Override
Expand Down Expand Up @@ -196,16 +201,29 @@ public void afterPropertiesSet() {
}
}

protected DeDuplicationStrategy<K, V> getDeDuplicationStrategy() {
return this.deDuplicationStrategy;
protected RecordFilterStrategy<K, V> getRecordFilterStrategy() {
return this.recordFilterStrategy;
}

/**
* Set a {@link DeDuplicationStrategy} implementation.
* @param deDuplicationStrategy the strategy implementation.
* Set a {@link RecordFilterStrategy} implementation.
* @param recordFilterStrategy the strategy implementation.
*/
public void setDeDuplicationStrategy(DeDuplicationStrategy<K, V> deDuplicationStrategy) {
this.deDuplicationStrategy = deDuplicationStrategy;
public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
this.recordFilterStrategy = recordFilterStrategy;
}

protected boolean isAckDiscarded() {
return this.ackDiscarded;
}

/**
* Set to true if the {@link #setRecordFilterStrategy(RecordFilterStrategy) recordFilterStrategy}
* is in use.
* @param ackDiscarded the ackDiscarded.
*/
public void setAckDiscarded(boolean ackDiscarded) {
this.ackDiscarded = ackDiscarded;
}

@Override
Expand All @@ -226,7 +244,21 @@ protected abstract MessageListener<K, V> createMessageListener(MessageListenerCo
private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {
MessageListener<K, V> messageListener = createMessageListener(container, messageConverter);
Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
container.setupMessageListener(messageListener);
if (this.recordFilterStrategy != null) {
if (messageListener instanceof AcknowledgingMessageListener) {
@SuppressWarnings("unchecked")
AcknowledgingMessageListener<K, V> aml = (AcknowledgingMessageListener<K, V>) messageListener;
aml = new FilteringAcknowledgingMessageListenerAdapter<>(this.recordFilterStrategy, aml, this.ackDiscarded);
container.setupMessageListener(aml);
}
else {
messageListener = new FilteringMessageListenerAdapter<>(this.recordFilterStrategy, messageListener);
container.setupMessageListener(messageListener);
}
}
else {
container.setupMessageListener(messageListener);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can still have this line of code without duplication in each if...else.
Minor, of course.
Will polish on merge.

}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageLis
Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set");
MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance();
if (getDeDuplicationStrategy() != null) {
messageListener.setDeDuplicationStrategy(getDeDuplicationStrategy());
}
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
if (messageConverter != null) {
messageListener.setMessageConverter(messageConverter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,26 @@
import org.springframework.util.Assert;

/**
* An abstract message listener adapter that implements de-duplication logic
* via a {@link DeDuplicationStrategy}.
* An abstract message listener adapter that implements record filter logic
* via a {@link RecordFilterStrategy}.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public abstract class AbstractDeDuplicatingMessageListener<K, V> {
public abstract class AbstractFilteringMessageListener<K, V> {

private final DeDuplicationStrategy<K, V> deDupStrategy;
private final RecordFilterStrategy<K, V> recordFilterStrategy;

protected AbstractDeDuplicatingMessageListener(DeDuplicationStrategy<K, V> deDupStrategy) {
Assert.notNull(deDupStrategy, "'deDupStrategy' cannot be null");
this.deDupStrategy = deDupStrategy;
protected AbstractFilteringMessageListener(RecordFilterStrategy<K, V> recordFilterStrategy) {
Assert.notNull(recordFilterStrategy, "'recordFilterStrategy' cannot be null");
this.recordFilterStrategy = recordFilterStrategy;
}

protected boolean isDuplicate(ConsumerRecord<K, V> consumerRecord) {
return this.deDupStrategy.isDuplicate(consumerRecord);
protected boolean filter(ConsumerRecord<K, V> consumerRecord) {
return this.recordFilterStrategy.filter(consumerRecord);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener.adapter;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;

/**
* A {@link AcknowledgingMessageListener} adapter that implements filter logic
* via a {@link RecordFilterStrategy}.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public class FilteringAcknowledgingMessageListenerAdapter<K, V> extends AbstractFilteringMessageListener<K, V>
implements AcknowledgingMessageListener<K, V> {

private final AcknowledgingMessageListener<K, V> delegate;

private final boolean ackDiscarded;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we add this option into the MessagingMessageListenerAdapter as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I was hoping to avoid that 😄

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to imagine a use case where not acknowledging a discarded message would be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They might be acking manually on some condition (special marker record perhaps and not want random acks being done on their behalf.

I guess I am just a bit uncomfortable issuing automatic acks when they have specifically requested to use manual acks.

Copy link

@mbogoevici mbogoevici Jun 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. An option gives a choice then. Should we at least make this true by default - otherwise, in the case of manual acks, they would almost never have a chance to ack those messages and there's no chance that they would ack them in the future on replay - so unless there's a message that they're interested in, this would keep redelivering messages on each restart.


/**
* Create an instance with the supplied strategy and delegate listener.
* @param recordFilterStrategy the filter.
* @param delegate the delegate.
* @param ackDiscarded true to ack (commit offset for) discarded messages.
*/
public FilteringAcknowledgingMessageListenerAdapter(RecordFilterStrategy<K, V> recordFilterStrategy,
AcknowledgingMessageListener<K, V> delegate, boolean ackDiscarded) {
super(recordFilterStrategy);
this.delegate = delegate;
this.ackDiscarded = ackDiscarded;
}

@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment) {
if (!filter(consumerRecord)) {
this.delegate.onMessage(consumerRecord, acknowledgment);
}
else {
if (this.ackDiscarded) {
acknowledgment.acknowledge();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,34 @@
import org.springframework.kafka.listener.MessageListener;

/**
* A {@link MessageListener} adapter that implements de-duplication logic
* via a DeDuplicationStrategy.
* A {@link MessageListener} adapter that implements filter logic
* via a {@link RecordFilterStrategy}.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public class DeDuplicatingMessageListenerAdapter<K, V> extends AbstractDeDuplicatingMessageListener<K, V>
public class FilteringMessageListenerAdapter<K, V> extends AbstractFilteringMessageListener<K, V>
implements MessageListener<K, V> {

private final MessageListener<K, V> delegate;

public DeDuplicatingMessageListenerAdapter(DeDuplicationStrategy<K, V> deDupStrategy,
/**
* Create an instance with the supplied strategy and delegate listener.
* @param recordFilterStrategy the filter.
* @param delegate the delegate.
*/
public FilteringMessageListenerAdapter(RecordFilterStrategy<K, V> recordFilterStrategy,
MessageListener<K, V> delegate) {
super(deDupStrategy);
super(recordFilterStrategy);
this.delegate = delegate;
}

@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord) {
if (!isDuplicate(consumerRecord)) {
if (!filter(consumerRecord)) {
this.delegate.onMessage(consumerRecord);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public class MessagingMessageListenerAdapter<K, V> extends AbstractAdaptableMess

private MessageConverter messageConverter = new MessagingMessageConverter();

private DeDuplicationStrategy<K, V> deDuplicationStrategy;


public MessagingMessageListenerAdapter(Method method) {
this.inferredType = determineInferredType(method);
Expand Down Expand Up @@ -103,18 +101,6 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
invokeHandler(record, acknowledgment, message);
}

protected DeDuplicationStrategy<K, V> getDeDuplicationStrategy() {
return this.deDuplicationStrategy;
}

/**
* Set a {@link DeDuplicationStrategy} implementation.
* @param deDuplicationStrategy the strategy implementation.
*/
public void setDeDuplicationStrategy(DeDuplicationStrategy<K, V> deDuplicationStrategy) {
this.deDuplicationStrategy = deDuplicationStrategy;
}

protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment) {
return getMessageConverter().toMessage(record, acknowledgment, this.inferredType);
}
Expand All @@ -128,9 +114,6 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, Acknowledgm
* @return the result of invocation.
*/
private Object invokeHandler(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Message<?> message) {
if (this.deDuplicationStrategy != null && this.deDuplicationStrategy.isDuplicate(record)) {
return null;
}
try {
return this.handlerMethod.invoke(message, record, acknowledgment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* Implementations of this interface can signal that a message about
* to be delivered to a message listener is a duplicate.
* Implementations of this interface can signal that a record about
* to be delivered to a message listener should be discarded instead
* of being delivered.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public interface DeDuplicationStrategy<K, V> {
public interface RecordFilterStrategy<K, V> {

/**
* Return true if the record is a duplicate and should be discarded.
* Return true if the record should be discarded.
* @param consumerRecord the record.
* @return true to discard.
*/
boolean isDuplicate(ConsumerRecord<K, V> consumerRecord);
boolean filter(ConsumerRecord<K, V> consumerRecord);

}
Loading