) KafkaTestUtils.getPropertyValue(embeddedKafkaBroker, "brokerProperties");
+
+ assertThat(properties.get("transaction.state.log.replication.factor")).isEqualTo("2");
+ }
@EmbeddedKafka(kraft = false)
private static final class TestWithEmbeddedKafka {
@@ -104,7 +124,7 @@ private static final class SecondTestWithEmbeddedKafka {
}
- @EmbeddedKafka(kraft = false, ports = 8085, bootstrapServersProperty = "my.bss.prop")
+ @EmbeddedKafka(kraft = false, ports = 8085, bootstrapServersProperty = "my.bss.prop", adminTimeout = 33)
private static final class TestWithEmbeddedKafkaPorts {
}
@@ -114,4 +134,9 @@ private static final class TestWithEmbeddedKafkaMulti {
}
+ @EmbeddedKafka(kraft = false, count = 2)
+ private static final class TestWithEmbeddedKafkaTransactionFactor {
+
+ }
+
}
diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/hamcrest/KafkaMatchersTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/hamcrest/KafkaMatchersTests.java
index 37c07fd32a..d7a3a129b2 100644
--- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/hamcrest/KafkaMatchersTests.java
+++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/hamcrest/KafkaMatchersTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2020 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,13 +16,6 @@
package org.springframework.kafka.test.hamcrest;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasKey;
-import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasPartition;
-import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasTimestamp;
-import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue;
-
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -30,6 +23,13 @@
import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Test;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasKey;
+import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasPartition;
+import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasTimestamp;
+import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue;
+
/**
* @author Biju Kunjummen
*
diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListenerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListenerTests.java
index 4137dcbd1f..664ba13aaf 100644
--- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListenerTests.java
+++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListenerTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 the original author or authors.
+ * Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,8 +16,6 @@
package org.springframework.kafka.test.junit;
-import static org.assertj.core.api.Assertions.assertThat;
-
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@@ -49,6 +47,8 @@
import org.springframework.util.DefaultPropertiesPersister;
+import static org.assertj.core.api.Assertions.assertThat;
+
/**
* @author Artem Bilan
*
diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java
index bb75e0715d..3842a27ab7 100644
--- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java
+++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2023 the original author or authors.
+ * Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,8 +16,6 @@
package org.springframework.kafka.test.rule;
-import static org.assertj.core.api.Assertions.assertThat;
-
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Map;
@@ -39,6 +37,8 @@
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+import static org.assertj.core.api.Assertions.assertThat;
+
/**
* @author Gary Russell
* @author Kamill Sokol
diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java
index b8f12fde84..e5be343800 100644
--- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java
+++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java
@@ -16,9 +16,6 @@
package org.springframework.kafka.test.utils;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-
import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -38,6 +35,9 @@
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
/**
* @author Gary Russell
* @author Artem Bilan
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java
index a163bc25a0..437284b638 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 the original author or authors.
+ * Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,7 +27,7 @@
/**
* Enable Kafka listener annotated endpoints that are created under the covers by a
* {@link org.springframework.kafka.config.AbstractKafkaListenerContainerFactory
- * AbstractListenerContainerFactory}. To be used on
+ * AbstractKafkaListenerContainerFactory}. To be used on
* {@link org.springframework.context.annotation.Configuration Configuration} classes as
* follows:
*
@@ -117,7 +117,7 @@
*
*
* @KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic")
- * public void process(String msg, @Header("kafka_partition") int partition) {
+ * public void process(String msg, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
* // process incoming message
* }
*
@@ -174,7 +174,7 @@
* @Override
* public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
* registrar.setEndpointRegistry(myKafkaListenerEndpointRegistry());
- * registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory);
+ * registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory());
* registrar.setValidator(new MyValidator());
* }
*
@@ -233,6 +233,7 @@
* @author Stephane Nicoll
* @author Gary Russell
* @author Artem Bilan
+ * @author Borahm Lee
*
* @see KafkaListener
* @see KafkaListenerAnnotationBeanPostProcessor
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
index fcab2fb01f..dcaafc4b81 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
@@ -30,11 +30,15 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@@ -46,7 +50,6 @@
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanInitializationException;
-import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.ObjectFactory;
@@ -97,6 +100,7 @@
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
import org.springframework.kafka.retrytopic.RetryTopicSchedulerWrapper;
import org.springframework.kafka.support.TopicPartitionOffset;
+import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
@@ -140,6 +144,9 @@
* @author Filip Halemba
* @author Tomaz Fernandes
* @author Wang Zhiyang
+ * @author Sanghyeok An
+ * @author Soby Chacko
+ * @author Omer Celik
*
* @see KafkaListener
* @see KafkaListenerErrorHandler
@@ -151,7 +158,7 @@
* @see MethodKafkaListenerEndpoint
*/
public class KafkaListenerAnnotationBeanPostProcessor
- implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton {
+ implements BeanPostProcessor, Ordered, ApplicationContextAware, SmartInitializingSingleton {
private static final String UNCHECKED = "unchecked";
@@ -181,10 +188,13 @@ public class KafkaListenerAnnotationBeanPostProcessor
private final AtomicInteger counter = new AtomicInteger();
+ private final AtomicBoolean enhancerIsBuilt = new AtomicBoolean();
+
private KafkaListenerEndpointRegistry endpointRegistry;
private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;
+ @Nullable
private ApplicationContext applicationContext;
private BeanFactory beanFactory;
@@ -197,8 +207,11 @@ public class KafkaListenerAnnotationBeanPostProcessor
private AnnotationEnhancer enhancer;
+ @Nullable
private RetryTopicConfigurer retryTopicConfigurer;
+ private final Lock globalLock = new ReentrantLock();
+
@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
@@ -270,12 +283,20 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
* {@link #setEndpointRegistry endpoint registry} has to be explicitly configured.
* @param beanFactory the {@link BeanFactory} to be used.
*/
- public synchronized void setBeanFactory(BeanFactory beanFactory) {
- this.beanFactory = beanFactory;
- if (beanFactory instanceof ConfigurableListableBeanFactory clbf) {
- this.resolver = clbf.getBeanExpressionResolver();
- this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
- this.listenerScope);
+ public void setBeanFactory(BeanFactory beanFactory) {
+ try {
+ this.globalLock.lock();
+ this.beanFactory = beanFactory;
+ if (beanFactory instanceof ConfigurableListableBeanFactory clbf) {
+ BeanExpressionResolver beanExpressionResolver = clbf.getBeanExpressionResolver();
+ if (beanExpressionResolver != null) {
+ this.resolver = beanExpressionResolver;
+ }
+ this.expressionContext = new BeanExpressionContext(clbf, this.listenerScope);
+ }
+ }
+ finally {
+ this.globalLock.unlock();
}
}
@@ -290,11 +311,6 @@ public void setCharset(Charset charset) {
this.charset = charset;
}
- @Override
- public void afterPropertiesSet() throws Exception {
- buildEnhancer();
- }
-
@Override
public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);
@@ -333,16 +349,18 @@ public void afterSingletonsInstantiated() {
// Actually register all listeners
this.registrar.afterPropertiesSet();
- Map sequencers =
- this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
- sequencers.values().forEach(ContainerGroupSequencer::initialize);
+ if (this.applicationContext != null) {
+ Map sequencers =
+ this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
+ sequencers.values().forEach(ContainerGroupSequencer::initialize);
+ }
}
private void buildEnhancer() {
- if (this.applicationContext != null) {
+ if (this.applicationContext != null && this.enhancerIsBuilt.compareAndSet(false, true)) {
Map enhancersMap =
this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
- if (enhancersMap.size() > 0) {
+ if (!enhancersMap.isEmpty()) {
List enhancers = enhancersMap.values()
.stream()
.sorted(new OrderComparator())
@@ -352,7 +370,7 @@ private void buildEnhancer() {
for (AnnotationEnhancer enh : enhancers) {
newAttrs = enh.apply(newAttrs, element);
}
- return attrs;
+ return newAttrs;
};
}
}
@@ -365,39 +383,40 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
+ buildEnhancer();
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class> targetClass = AopUtils.getTargetClass(bean);
Collection classLevelListeners = findListenerAnnotations(targetClass);
- final boolean hasClassLevelListeners = !classLevelListeners.isEmpty();
- final List multiMethods = new ArrayList<>();
Map> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup>) method -> {
Set listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
- if (hasClassLevelListeners) {
- Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
- (ReflectionUtils.MethodFilter) method ->
- AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
- multiMethods.addAll(methodsWithHandler);
- }
- if (annotatedMethods.isEmpty() && !hasClassLevelListeners) {
+ boolean hasClassLevelListeners = !classLevelListeners.isEmpty();
+ boolean hasMethodLevelListeners = !annotatedMethods.isEmpty();
+ if (!hasMethodLevelListeners && !hasClassLevelListeners) {
this.nonAnnotatedClasses.add(bean.getClass());
this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
}
else {
- // Non-empty set of methods
- for (Map.Entry> entry : annotatedMethods.entrySet()) {
- Method method = entry.getKey();
- for (KafkaListener listener : entry.getValue()) {
- processKafkaListener(listener, method, bean, beanName);
+ if (hasMethodLevelListeners) {
+ // Non-empty set of methods
+ for (Map.Entry> entry : annotatedMethods.entrySet()) {
+ Method method = entry.getKey();
+ for (KafkaListener listener : entry.getValue()) {
+ processKafkaListener(listener, method, bean, beanName);
+ }
}
+ this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
+ + beanName + "': " + annotatedMethods);
+ }
+ if (hasClassLevelListeners) {
+ Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
+ (ReflectionUtils.MethodFilter) method ->
+ AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
+ List multiMethods = new ArrayList<>(methodsWithHandler);
+ processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName);
}
- this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
- + beanName + "': " + annotatedMethods);
- }
- if (hasClassLevelListeners) {
- processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
@@ -443,73 +462,75 @@ private KafkaListener enhance(AnnotatedElement element, KafkaListener ann) {
}
}
- private synchronized void processMultiMethodListeners(Collection classLevelListeners,
- List multiMethods, Object bean, String beanName) {
-
- List checkedMethods = new ArrayList<>();
- Method defaultMethod = null;
- for (Method method : multiMethods) {
- Method checked = checkProxy(method, bean);
- KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
- if (annotation != null && annotation.isDefault()) {
- final Method toAssert = defaultMethod;
- Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
- + toAssert.toString() + " and " + method.toString());
- defaultMethod = checked;
+ private void processMultiMethodListeners(Collection classLevelListeners,
+ List multiMethods, Class> clazz, Object bean, String beanName) {
+
+ try {
+ this.globalLock.lock();
+ List checkedMethods = new ArrayList<>();
+ Method defaultMethod = null;
+ for (Method method : multiMethods) {
+ Method checked = checkProxy(method, bean);
+ KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
+ if (annotation != null && annotation.isDefault()) {
+ Method toAssert = defaultMethod;
+ Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
+ + toAssert.toString() + " and " + method);
+ defaultMethod = checked;
+ }
+ checkedMethods.add(checked);
+ }
+ for (KafkaListener classLevelListener : classLevelListeners) {
+ MultiMethodKafkaListenerEndpoint endpoint =
+ new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
+ processMainAndRetryListeners(classLevelListener, bean, beanName, endpoint, null, clazz);
}
- checkedMethods.add(checked);
- }
- for (KafkaListener classLevelListener : classLevelListeners) {
- MultiMethodKafkaListenerEndpoint endpoint =
- new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
- String beanRef = classLevelListener.beanRef();
- this.listenerScope.addListener(beanRef, bean);
- endpoint.setId(getEndpointId(classLevelListener));
- processListener(endpoint, classLevelListener, bean, beanName, resolveTopics(classLevelListener),
- resolveTopicPartitions(classLevelListener));
- this.listenerScope.removeListener(beanRef);
+ }
+ finally {
+ this.globalLock.unlock();
}
}
- protected synchronized void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean,
+ protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean,
String beanName) {
- Method methodToUse = checkProxy(method, bean);
- MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>();
- endpoint.setMethod(methodToUse);
+ try {
+ this.globalLock.lock();
+ Method methodToUse = checkProxy(method, bean);
+ MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>();
+ endpoint.setMethod(methodToUse);
+ processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, methodToUse, null);
+ }
+ finally {
+ this.globalLock.unlock();
+ }
+ }
+
+ private void processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName,
+ MethodKafkaListenerEndpoint endpoint, @Nullable Method methodToUse, @Nullable Class> clazz) {
String beanRef = kafkaListener.beanRef();
this.listenerScope.addListener(beanRef, bean);
endpoint.setId(getEndpointId(kafkaListener));
String[] topics = resolveTopics(kafkaListener);
TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
- if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
+ if (!processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, topics, tps, methodToUse, clazz)) {
processListener(endpoint, kafkaListener, bean, beanName, topics, tps);
}
this.listenerScope.removeListener(beanRef);
}
private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName,
- Method methodToUse, MethodKafkaListenerEndpoint endpoint, String[] topics,
- TopicPartitionOffset[] tps) {
-
- String[] retryableCandidates = topics;
- if (retryableCandidates.length == 0 && tps.length > 0) {
- retryableCandidates = Arrays.stream(tps)
- .map(tp -> tp.getTopic())
- .distinct()
- .toList()
- .toArray(new String[0]);
- }
+ MethodKafkaListenerEndpoint endpoint, String[] topics, TopicPartitionOffset[] tps,
+ @Nullable Method methodToUse, @Nullable Class> clazz) {
+ String[] retryableCandidates = getTopicsFromTopicPartitionOffset(topics, tps);
RetryTopicConfiguration retryTopicConfiguration = new RetryTopicConfigurationProvider(this.beanFactory,
this.resolver, this.expressionContext)
- .findRetryConfigurationFor(retryableCandidates, methodToUse, bean);
-
+ .findRetryConfigurationFor(retryableCandidates, methodToUse, clazz, bean);
if (retryTopicConfiguration == null) {
- String[] candidates = retryableCandidates;
this.logger.debug(() ->
- "No retry topic configuration found for topics " + Arrays.toString(candidates));
+ "No retry topic configuration found for topics " + Arrays.toString(retryableCandidates));
return false;
}
@@ -525,6 +546,18 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
return true;
}
+ private String[] getTopicsFromTopicPartitionOffset(String[] topics, TopicPartitionOffset[] tps) {
+ String[] retryableCandidates = topics;
+ if (retryableCandidates.length == 0 && tps.length > 0) {
+ retryableCandidates = Arrays.stream(tps)
+ .map(TopicPartitionOffset::getTopic)
+ .distinct()
+ .toList()
+ .toArray(new String[0]);
+ }
+ return retryableCandidates;
+ }
+
private RetryTopicConfigurer getRetryTopicConfigurer() {
if (this.retryTopicConfigurer == null) {
try {
@@ -736,11 +769,16 @@ private KafkaListenerContainerFactory> resolveContainerFactory(KafkaListener k
private void resolveContainerPostProcessor(MethodKafkaListenerEndpoint, ?> endpoint,
KafkaListener kafkaListener) {
-
- final String containerPostProcessor = kafkaListener.containerPostProcessor();
- if (StringUtils.hasText(containerPostProcessor)) {
- endpoint.setContainerPostProcessor(this.beanFactory.getBean(containerPostProcessor,
- ContainerPostProcessor.class));
+ Object containerPostProcessor = resolveExpression(kafkaListener.containerPostProcessor());
+ if (containerPostProcessor instanceof ContainerPostProcessor, ?, ?> cpp) {
+ endpoint.setContainerPostProcessor(cpp);
+ }
+ else {
+ String containerPostProcessorBeanName = resolveExpressionAsString(kafkaListener.containerPostProcessor(), "containerPostProcessor");
+ if (StringUtils.hasText(containerPostProcessorBeanName)) {
+ endpoint.setContainerPostProcessor(
+ this.beanFactory.getBean(containerPostProcessorBeanName, ContainerPostProcessor.class));
+ }
}
}
@@ -804,7 +842,8 @@ private String getEndpointId(KafkaListener kafkaListener) {
}
}
- private String getEndpointGroupId(KafkaListener kafkaListener, String id) {
+ @Nullable
+ private String getEndpointGroupId(KafkaListener kafkaListener, @Nullable String id) {
String groupId = null;
if (StringUtils.hasText(kafkaListener.groupId())) {
groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
@@ -818,10 +857,8 @@ private String getEndpointGroupId(KafkaListener kafkaListener, String id) {
private TopicPartitionOffset[] resolveTopicPartitions(KafkaListener kafkaListener) {
TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
List result = new ArrayList<>();
- if (topicPartitions.length > 0) {
- for (TopicPartition topicPartition : topicPartitions) {
- result.addAll(resolveTopicPartitionsList(topicPartition));
- }
+ for (TopicPartition topicPartition : topicPartitions) {
+ result.addAll(resolveTopicPartitionsList(topicPartition));
}
return result.toArray(new TopicPartitionOffset[0]);
}
@@ -868,7 +905,7 @@ private List resolveTopicPartitionsList(TopicPartition top
() -> "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
List result = new ArrayList<>();
for (String partition : partitions) {
- resolvePartitionAsInteger((String) topic, resolveExpression(partition), result, null, false, false);
+ resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
}
if (partitionOffsets.length == 1 && resolveExpression(partitionOffsets[0].partition()).equals("*")) {
result.forEach(tpo -> {
@@ -881,7 +918,8 @@ private List resolveTopicPartitionsList(TopicPartition top
Assert.isTrue(!partitionOffset.partition().equals("*"), () ->
"Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result);
resolvePartitionAsInteger((String) topic, resolveExpression(partitionOffset.partition()), result,
- resolveInitialOffset(topic, partitionOffset), isRelative(topic, partitionOffset), true);
+ resolveInitialOffset(topic, partitionOffset), isRelative(topic, partitionOffset), true,
+ resolveExpression(partitionOffset.seekPosition()));
}
}
Assert.isTrue(!result.isEmpty(), () -> "At least one partition required for " + topic);
@@ -890,11 +928,11 @@ private List resolveTopicPartitionsList(TopicPartition top
private Long resolveInitialOffset(Object topic, PartitionOffset partitionOffset) {
Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());
- Long initialOffset;
+ long initialOffset;
if (initialOffsetValue instanceof String str) {
Assert.state(StringUtils.hasText(str),
() -> "'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");
- initialOffset = Long.valueOf(str);
+ initialOffset = Long.parseLong(str);
}
else if (initialOffsetValue instanceof Long lng) {
initialOffset = lng;
@@ -945,20 +983,33 @@ else if (resolvedValue instanceof Iterable) {
}
}
+ private void resolvePartitionAsInteger(String topic, Object resolvedValue, List result) {
+ resolvePartitionAsInteger(topic, resolvedValue, result, null, false, false, null);
+ }
+
@SuppressWarnings(UNCHECKED)
- private void resolvePartitionAsInteger(String topic, Object resolvedValue,
- List result, @Nullable Long offset, boolean isRelative, boolean checkDups) {
+ private void resolvePartitionAsInteger(String topic, Object resolvedValue, List result,
+ @Nullable Long offset, boolean isRelative, boolean checkDups, @Nullable Object seekPosition) {
if (resolvedValue instanceof String[] strArr) {
for (Object object : strArr) {
- resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups);
+ resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups, seekPosition);
}
+ return;
}
- else if (resolvedValue instanceof String str) {
+ else if (resolvedValue instanceof Iterable) {
+ for (Object object : (Iterable