diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/event/AbstractKafkaStreamsEvent.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/event/AbstractKafkaStreamsEvent.java index 7ebcaaeae..b082022a7 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/event/AbstractKafkaStreamsEvent.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/event/AbstractKafkaStreamsEvent.java @@ -15,7 +15,7 @@ */ package io.micronaut.configuration.kafka.streams.event; -import io.micronaut.context.event.ApplicationEvent; +import io.micronaut.configuration.kafka.event.AbstractKafkaApplicationEvent; import org.apache.kafka.streams.KafkaStreams; /** @@ -24,27 +24,19 @@ * @author graemerocher * @since 2.0.0 */ -public abstract class AbstractKafkaStreamsEvent extends ApplicationEvent { - private final KafkaStreams kafkaStreams; - +public abstract class AbstractKafkaStreamsEvent extends AbstractKafkaApplicationEvent { /** * Default constructor. * @param kafkaStreams The streams */ protected AbstractKafkaStreamsEvent(KafkaStreams kafkaStreams) { super(kafkaStreams); - this.kafkaStreams = kafkaStreams; - } - - @Override - public KafkaStreams getSource() { - return (KafkaStreams) super.getSource(); } /** * @return The kafka streams object. */ public KafkaStreams getKafkaStreams() { - return kafkaStreams; + return getSource(); } } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/event/AbstractKafkaApplicationEvent.java b/kafka/src/main/java/io/micronaut/configuration/kafka/event/AbstractKafkaApplicationEvent.java new file mode 100644 index 000000000..deb9bc10e --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/event/AbstractKafkaApplicationEvent.java @@ -0,0 +1,42 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka.event; + +import io.micronaut.context.event.ApplicationEvent; + +/** + * Abstract base class for all kafka application events. + * + * @author Jorge F. Sanchez + * @since 5.1.0 + * @param the {@link ApplicationEvent} source object type + */ +public class AbstractKafkaApplicationEvent extends ApplicationEvent { + /** + * Constructs a prototypical Event. + * + * @param source The object on which the Event initially occurred. + * @throws IllegalArgumentException if source is null. + */ + public AbstractKafkaApplicationEvent(T source) { + super(source); + } + + @Override + public T getSource() { + return (T) super.getSource(); + } +} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/event/KafkaConsumerStartedPollingEvent.java b/kafka/src/main/java/io/micronaut/configuration/kafka/event/KafkaConsumerStartedPollingEvent.java new file mode 100644 index 000000000..f64436eec --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/event/KafkaConsumerStartedPollingEvent.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka.event; + +import org.apache.kafka.clients.consumer.Consumer; + +/** + * An event fired after a Kafka {@link Consumer} executes the first polling. + * + * @author Jorge F. Sanchez + * @since 5.1.0 + */ +public final class KafkaConsumerStartedPollingEvent extends AbstractKafkaApplicationEvent { + /** + * Constructs an event with a given Consumer source. + * + * @param consumer The Consumer on which the Event initially occurred. + * @throws IllegalArgumentException if source is null. + */ + public KafkaConsumerStartedPollingEvent(Consumer consumer) { + super(consumer); + } +} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/event/KafkaConsumerSubscribedEvent.java b/kafka/src/main/java/io/micronaut/configuration/kafka/event/KafkaConsumerSubscribedEvent.java new file mode 100644 index 000000000..5d13743dc --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/event/KafkaConsumerSubscribedEvent.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka.event; + +import org.apache.kafka.clients.consumer.Consumer; + +/** + * An event fired after a Kafka {@link Consumer} subscribes to a set of Kafka topics. + * + * @author Jorge F. Sanchez + * @since 5.1.0 + */ +public final class KafkaConsumerSubscribedEvent extends AbstractKafkaApplicationEvent { + /** + * Constructs an event with a given Consumer source. + * + * @param consumer The Consumer on which the Event initially occurred. + * @throws IllegalArgumentException if source is null. + */ + public KafkaConsumerSubscribedEvent(Consumer consumer) { + super(consumer); + } +} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index 645036024..b0419066d 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -33,11 +33,14 @@ import io.micronaut.configuration.kafka.config.AbstractKafkaConsumerConfiguration; import io.micronaut.configuration.kafka.config.DefaultKafkaConsumerConfiguration; import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration; +import io.micronaut.configuration.kafka.event.KafkaConsumerStartedPollingEvent; +import io.micronaut.configuration.kafka.event.KafkaConsumerSubscribedEvent; import io.micronaut.configuration.kafka.exceptions.KafkaListenerException; import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler; import io.micronaut.configuration.kafka.serde.SerdeRegistry; import io.micronaut.context.BeanContext; import io.micronaut.context.annotation.Requires; +import io.micronaut.context.event.ApplicationEventPublisher; import io.micronaut.context.processor.ExecutableMethodProcessor; import io.micronaut.core.annotation.AnnotationValue; import io.micronaut.core.annotation.Blocking; @@ -147,6 +150,8 @@ class KafkaConsumerProcessor private final TransactionalProducerRegistry transactionalProducerRegistry; private final BatchConsumerRecordsBinderRegistry batchBinderRegistry; private final AtomicInteger clientIdGenerator = new AtomicInteger(10); + private final ApplicationEventPublisher kafkaConsumerStartedPollingEventPublisher; + private final ApplicationEventPublisher kafkaConsumerSubscribedEventPublisher; /** * Creates a new processor using the given {@link ExecutorService} to schedule consumers on. @@ -162,6 +167,8 @@ class KafkaConsumerProcessor * @param exceptionHandler The exception handler to use * @param schedulerService The scheduler service * @param transactionalProducerRegistry The transactional producer registry + * @param startedEventPublisher The KafkaConsumerStartedPollingEvent publisher + * @param subscribedEventPublisher The KafkaConsumerSubscribedEvent publisher */ KafkaConsumerProcessor( @Named(TaskExecutors.MESSAGE_CONSUMER) ExecutorService executorService, @@ -174,7 +181,9 @@ class KafkaConsumerProcessor ProducerRegistry producerRegistry, KafkaListenerExceptionHandler exceptionHandler, @Named(TaskExecutors.SCHEDULED) ExecutorService schedulerService, - TransactionalProducerRegistry transactionalProducerRegistry) { + TransactionalProducerRegistry transactionalProducerRegistry, + ApplicationEventPublisher startedEventPublisher, + ApplicationEventPublisher subscribedEventPublisher) { this.executorService = executorService; this.applicationConfiguration = applicationConfiguration; this.beanContext = beanContext; @@ -186,6 +195,8 @@ class KafkaConsumerProcessor this.exceptionHandler = exceptionHandler; this.taskScheduler = new ScheduledExecutorTaskScheduler(schedulerService); this.transactionalProducerRegistry = transactionalProducerRegistry; + this.kafkaConsumerStartedPollingEventPublisher = startedEventPublisher; + this.kafkaConsumerSubscribedEventPublisher = subscribedEventPublisher; this.beanContext.getBeanDefinitions(Qualifiers.byType(KafkaListener.class)) .forEach(definition -> { // pre-initialize singletons before processing @@ -421,6 +432,7 @@ private void submitConsumerThread(final ExecutableMethod method, ca.setKafkaConsumer(kafkaConsumer); } setupConsumerSubscription(method, topicAnnotations, consumerBean, kafkaConsumer); + kafkaConsumerSubscribedEventPublisher.publishEvent(new KafkaConsumerSubscribedEvent(kafkaConsumer)); ConsumerState consumerState = new ConsumerState(clientId, groupId, offsetStrategy, kafkaConsumer, consumerBean, Collections.unmodifiableSet(kafkaConsumer.subscription()), consumerAnnotation, method); consumers.put(clientId, consumerState); executorService.submit(() -> createConsumerThreadPollLoop(method, consumerState)); @@ -446,6 +458,7 @@ private void createConsumerThreadPollLoop(final ExecutableMethod method, consumerArg.ifPresent(argument -> boundArguments.put(argument, kafkaConsumer)); //noinspection InfiniteLoopStatement + boolean pollingStarted = false; while (true) { final Set newAssignments = Collections.unmodifiableSet(kafkaConsumer.assignment()); if (LOG.isInfoEnabled() && !newAssignments.equals(consumerState.assignments)) { @@ -482,6 +495,11 @@ private void createConsumerThreadPollLoop(final ExecutableMethod method, consumerRecords = kafkaConsumer.poll(pollTimeout); } consumerState.closedState = ConsumerCloseState.POLLING; + if (!pollingStarted) { + pollingStarted = true; + kafkaConsumerStartedPollingEventPublisher.publishEvent(new KafkaConsumerStartedPollingEvent(kafkaConsumer)); + } + failed = true; consumerState.resumeTopicPartitions(); diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/event/KafkaConsumerEventSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/event/KafkaConsumerEventSpec.groovy new file mode 100644 index 000000000..803032e13 --- /dev/null +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/event/KafkaConsumerEventSpec.groovy @@ -0,0 +1,76 @@ +package io.micronaut.configuration.kafka.event + +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.Requires +import io.micronaut.context.event.ApplicationEventListener +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import jakarta.inject.Inject +import jakarta.inject.Singleton +import org.apache.kafka.clients.consumer.KafkaConsumer +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +import static org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchStates.FETCHING + +@Property(name = "spec.name", value = "KafkaConsumerEventSpec") +@MicronautTest(startApplication = false) +class KafkaConsumerEventSpec extends Specification { + + @Inject + KafkaConsumerSubscribedEventListener subscribedEventListener + + @Inject + KafkaConsumerStartedPollingEventListener startedPollingEvent + + void "listen to kafka consumer subscribed events"() { + expect: "the event is emitted and consumed by the event listener" + new PollingConditions(timeout: 10, delay: 1).eventually { + subscribedEventListener.received instanceof KafkaConsumerSubscribedEvent + } + and: "the kafka consumer is subscribed to the expected topic" + subscribedEventListener.consumer.subscriptions.subscription == ['my-nifty-topic'] as Set + } + + void "listen to kafka consumer started polling events"() { + expect: "the event is emitted and consumed by the event listener" + new PollingConditions(timeout: 10, delay: 1).eventually { + startedPollingEvent.received instanceof KafkaConsumerStartedPollingEvent + } + and: "the kafka consumer starts fetching records" + new PollingConditions(timeout: 10, delay: 1).eventually { + subscribedEventListener.consumer.subscriptions.assignment.partitionStateValues()[0].fetchState == FETCHING + } + } + + @KafkaListener(clientId = "my-nifty-kafka-consumer") + @Requires(property = "spec.name", value = "KafkaConsumerEventSpec") + static class MyKafkaConsumer { + @Topic("my-nifty-topic") + void consume(String event) {} + } + + static class AbstractKafkaConsumerEventListener implements ApplicationEventListener { + AbstractKafkaApplicationEvent received + KafkaConsumer consumer + + @Override + void onApplicationEvent(T event) { + // Skip consumers unrelated to this test + if ((event.source as KafkaConsumer).clientId.startsWith('my-nifty-kafka-consumer')) { + if (received != null) throw new RuntimeException("Expecting one event only") + received = event + consumer = event.source + } + } + } + + @Singleton + @Requires(property = "spec.name", value = "KafkaConsumerEventSpec") + static class KafkaConsumerSubscribedEventListener extends AbstractKafkaConsumerEventListener { } + + @Singleton + @Requires(property = "spec.name", value = "KafkaConsumerEventSpec") + static class KafkaConsumerStartedPollingEventListener extends AbstractKafkaConsumerEventListener { } +}