Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,11 @@ private void processMultiMethodListeners(Collection<KafkaListener> classLevelLis
for (KafkaListener classLevelListener : classLevelListeners) {
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
processListener(endpoint, classLevelListener, bean, beanName);
String beanRef = classLevelListener.beanRef();
this.listenerScope.addListener(beanRef, bean);
processListener(endpoint, classLevelListener, bean, beanName, resolveTopics(classLevelListener),
resolveTopicPartitions(classLevelListener));
this.listenerScope.removeListener(beanRef);
}
}

Expand All @@ -447,37 +451,49 @@ protected void processKafkaListener(KafkaListener kafkaListener, Method method,
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);

if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint)) {
processListener(endpoint, kafkaListener, bean, beanName);
String beanRef = kafkaListener.beanRef();
this.listenerScope.addListener(beanRef, bean);
String[] topics = resolveTopics(kafkaListener);
TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
processListener(endpoint, kafkaListener, bean, beanName, topics, tps);
}
this.listenerScope.removeListener(beanRef);
}

private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName,
Method methodToUse, MethodKafkaListenerEndpoint<K, V> endpoint) {
Method methodToUse, MethodKafkaListenerEndpoint<K, V> endpoint, String[] topics,
TopicPartitionOffset[] tps) {

String[] retryableCandidates = topics;
if (retryableCandidates.length == 0 && tps.length > 0) {
retryableCandidates = Arrays.stream(tps)
.map(tp -> tp.getTopic())
.distinct()
.collect(Collectors.toList())
.toArray(new String[0]);
}

RetryTopicConfiguration retryTopicConfiguration = new RetryTopicConfigurationProvider(this.beanFactory,
this.resolver, this.expressionContext)
.findRetryConfigurationFor(kafkaListener.topics(), methodToUse, bean);
.findRetryConfigurationFor(retryableCandidates, methodToUse, bean);

if (retryTopicConfiguration == null) {
this.logger.debug("No retry topic configuration found for topics " + Arrays.asList(kafkaListener.topics()));
String[] candidates = retryableCandidates;
this.logger.debug(() ->
"No retry topic configuration found for topics " + Arrays.toString(candidates));
return false;
}

RetryTopicConfigurer.EndpointProcessor endpointProcessor = endpointToProcess ->
this.processKafkaListenerAnnotationForRetryTopic(endpointToProcess, kafkaListener, bean);

String beanRef = kafkaListener.beanRef();
this.listenerScope.addListener(beanRef, bean);
this.processKafkaListenerAnnotationForRetryTopic(endpointToProcess, kafkaListener, bean, topics, tps);

KafkaListenerContainerFactory<?> factory =
resolveContainerFactory(kafkaListener, resolve(kafkaListener.containerFactory()), beanName);

getRetryTopicConfigurer()
.processMainAndRetryListeners(endpointProcessor, endpoint, retryTopicConfiguration,
this.registrar, factory, this.defaultContainerFactoryBeanName);

this.listenerScope.removeListener(beanRef);
return true;
}

Expand Down Expand Up @@ -539,43 +555,34 @@ private Method checkProxy(Method methodArg, Object bean) {
}

protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
Object bean, String beanName) {
Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {

String beanRef = kafkaListener.beanRef();
if (StringUtils.hasText(beanRef)) {
this.listenerScope.addListener(beanRef, bean);
}

processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean);
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean, topics, tps);

String containerFactory = resolve(kafkaListener.containerFactory());
KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener, containerFactory, beanName);

this.registrar.registerEndpoint(endpoint, listenerContainerFactory);

processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);

if (StringUtils.hasText(beanRef)) {
this.listenerScope.removeListener(beanRef);
}
}

private void processKafkaListenerAnnotationForRetryTopic(MethodKafkaListenerEndpoint<?, ?> endpoint,
KafkaListener kafkaListener, Object bean) {
KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {

processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean);
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean, topics, tps);
processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);
}

private void processKafkaListenerAnnotationBeforeRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint,
KafkaListener kafkaListener, Object bean) {
KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {

endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(kafkaListener));
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
endpoint.setTopics(resolveTopics(kafkaListener));
endpoint.setTopicPartitions(tps);
endpoint.setTopics(topics);
endpoint.setTopicPattern(resolvePattern(kafkaListener));
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
String group = kafkaListener.containerGroup();
Expand Down Expand Up @@ -1107,7 +1114,7 @@ public String convert(byte[] source) {

}

private static class ListenerScope implements Scope {
static class ListenerScope implements Scope {

private final Map<String, Object> listeners = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private Collection<String> getTopics(MethodKafkaListenerEndpoint<?, ?> endpoint)
if (topicPartitionsToAssign != null && topicPartitionsToAssign.length > 0) {
topics = Arrays.stream(topicPartitionsToAssign)
.map(TopicPartitionOffset::getTopic)
.distinct()
.collect(Collectors.toList());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.retrytopic;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Gary Russell
* @since 2.7.7
*
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = RetryTopicConfigurationIntegrationTests.TOPIC1, partitions = 1)
class RetryTopicConfigurationIntegrationTests {

public static final String TOPIC1 = "RetryTopicConfigurationIntegrationTests.1";

@Test
void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFactory<Integer, String> cf,
@Autowired KafkaTemplate<Integer, String> template, @Autowired Config config) throws InterruptedException {

Consumer<Integer, String> consumer = cf.createConsumer("grp2", "");
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
assertThat(topics.keySet()).contains("RetryTopicConfigurationIntegrationTests.1",
"RetryTopicConfigurationIntegrationTests.1-dlt", "RetryTopicConfigurationIntegrationTests.1-retry-100",
"RetryTopicConfigurationIntegrationTests.1-retry-110");
template.send(TOPIC1, "foo");
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Configuration(proxyBeanMethods = false)
@EnableKafka
static class Config {

private final CountDownLatch latch = new CountDownLatch(1);

@KafkaListener(id = TOPIC1, topics = "#{'${some.prop:" + TOPIC1 + "}'}")
void listen1(String in) {
throw new RuntimeException("test");
}

void dlt(String in) {
this.latch.countDown();
}

@Bean
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(KafkaTemplate<Integer, String> template,
ConsumerFactory<Integer, String> consumerFactory) {

ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setReplyTemplate(template);
return factory;
}

@Bean
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker embeddedKafka) {
return new DefaultKafkaConsumerFactory<>(
KafkaTestUtils.consumerProps("retryConfig", "false", embeddedKafka));
}

@Bean
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}

@Bean
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}

@Bean
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
return new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()));
}

@Bean
RetryTopicConfiguration retryTopicConfiguration1(KafkaTemplate<Integer, String> template) {
return RetryTopicConfigurationBuilder.newInstance()
.includeTopic(TOPIC1)
.exponentialBackoff(100, 1.1, 110)
.dltHandlerMethod(getClass(), "dlt")
.create(template);
}

}

}
Loading