Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
* <p>Note: When provided, this value will override the group id property
* in the consumer factory configuration, unless {@link #idIsGroup()}
* is set to false or {@link #groupId()} is provided.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* <p>SpEL {@code #{...}} and property placeholders {@code ${...}} are supported.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
* @author Christian Fredriksson
* @author Timofey Barabanov
* @author Janek Lasocki-Biczysko
* @author Mikhail Polivakha
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -340,7 +341,7 @@ public void resumePartition(TopicPartition topicPartition) {
}

private void consumerWakeIfNecessary() {
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
ListenerConsumer consumer = this.listenerConsumer;
if (consumer != null) {
consumer.wakeIfNecessary();
}
Expand Down Expand Up @@ -913,14 +914,23 @@ else if (listener instanceof MessageListener) {
this.pollThreadStateProcessor = setUpPollProcessor(false);
this.observationEnabled = this.containerProperties.isObservationEnabled();

if (!AopUtils.isAopProxy(this.genericListener) &&
this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter<?, ?>) {
if (!AopUtils.isAopProxy(this.genericListener)) {

KafkaBackoffAwareMessageListenerAdapter<K, V> genListener =
(KafkaBackoffAwareMessageListenerAdapter<K, V>) this.genericListener;
if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter<K, V> adapterListener) {
// This means that the async retry feature is supported only for SingleRecordListener with @RetryableTopic.
adapterListener.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
if (this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter<?, ?>) {
KafkaBackoffAwareMessageListenerAdapter<K, V> genListener =
(KafkaBackoffAwareMessageListenerAdapter<K, V>) this.genericListener;

if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter<K, V> adapterListener) {
// This means that the async retry feature is supported only for SingleRecordListener when used with @RetryableTopic.
adapterListener.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
}
}

if (this.genericListener instanceof RecordMessagingMessageListenerAdapter<?, ?>) {
RecordMessagingMessageListenerAdapter<K, V> recordListnener =
(RecordMessagingMessageListenerAdapter<K, V>) this.genericListener;

recordListnener.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerConfig
Expand All @@ -35,7 +36,10 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.listener.CommonErrorHandler
import org.springframework.kafka.listener.DefaultErrorHandler
import org.springframework.kafka.listener.KafkaListenerErrorHandler
import org.springframework.kafka.listener.MessageListenerContainer
import org.springframework.kafka.support.Acknowledgment
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.context.EmbeddedKafka
Expand All @@ -53,12 +57,13 @@ import java.util.concurrent.TimeUnit
*
* @author Wang ZhiYang
* @author Artem Bilan
* @author Mikhail Polivakha
*
* @since 3.1
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2",
@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2", "kotlinAsyncTestTopicContainerLevelHandler",
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1"], partitions = 1)
class EnableKafkaKotlinCoroutinesTests {

Expand Down Expand Up @@ -87,6 +92,12 @@ class EnableKafkaKotlinCoroutinesTests {
assertThat(this.config.error).isTrue()
}

@Test
fun `test checked ex - container level handler should be invoked`() {
this.template.send("kotlinAsyncTestTopicContainerLevelHandler", "fail")
assertThat(this.config.latchForCommonHandler.await(10, TimeUnit.SECONDS)).isTrue()
}

@Test
fun `test batch listener`() {
this.template.send("kotlinAsyncBatchTestTopic1", "foo")
Expand Down Expand Up @@ -142,6 +153,8 @@ class EnableKafkaKotlinCoroutinesTests {

val latch2 = CountDownLatch(1)

val latchForCommonHandler = CountDownLatch(1)

val batchLatch1 = CountDownLatch(1)

val batchLatch2 = CountDownLatch(1)
Expand Down Expand Up @@ -190,6 +203,21 @@ class EnableKafkaKotlinCoroutinesTests {
}
}

@Bean
fun defaultErrorHandler(): CommonErrorHandler {
return object : CommonErrorHandler {
override fun handleOne(
thrownException: java.lang.Exception,
record: ConsumerRecord<*, *>,
consumer: Consumer<*, *>,
container: MessageListenerContainer
): Boolean {
latchForCommonHandler.countDown()
return super.handleOne(thrownException, record, consumer, container)
}
}
}

@Bean
fun errorHandlerBatch() : KafkaListenerErrorHandler {
return KafkaListenerErrorHandler { message, _ ->
Expand All @@ -200,11 +228,12 @@ class EnableKafkaKotlinCoroutinesTests {
}

@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
fun kafkaListenerContainerFactory(commonErrorHandler: CommonErrorHandler): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory: ConcurrentKafkaListenerContainerFactory<String, String>
= ConcurrentKafkaListenerContainerFactory()
factory.setConsumerFactory(kcf())
factory.setReplyTemplate(kt())
factory.setCommonErrorHandler(commonErrorHandler)
return factory
}

Expand Down Expand Up @@ -247,6 +276,16 @@ class EnableKafkaKotlinCoroutinesTests {
}
}

@KafkaListener(
id = "kotlin-ex-container-level-handler",
topics = ["kotlinAsyncTestTopicContainerLevelHandler"],
containerFactory = "kafkaListenerContainerFactory")
suspend fun listenExContainerLevelHandlerInvoked(value: String) {
if (value == "fail") {
throw RuntimeException("checked")
}
}

}

}
Loading