Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish events after Consumer subscribes and after first polling #677

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.micronaut.configuration.kafka.streams.event;

import io.micronaut.context.event.ApplicationEvent;
import io.micronaut.configuration.kafka.event.AbstractKafkaApplicationEvent;
import org.apache.kafka.streams.KafkaStreams;

/**
Expand All @@ -24,27 +24,19 @@
* @author graemerocher
* @since 2.0.0
*/
public abstract class AbstractKafkaStreamsEvent extends ApplicationEvent {
private final KafkaStreams kafkaStreams;

public abstract class AbstractKafkaStreamsEvent extends AbstractKafkaApplicationEvent<KafkaStreams> {
/**
* Default constructor.
* @param kafkaStreams The streams
*/
protected AbstractKafkaStreamsEvent(KafkaStreams kafkaStreams) {
super(kafkaStreams);
this.kafkaStreams = kafkaStreams;
}

@Override
public KafkaStreams getSource() {
return (KafkaStreams) super.getSource();
}

/**
* @return The kafka streams object.
*/
public KafkaStreams getKafkaStreams() {
return kafkaStreams;
return getSource();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2017-2023 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.event;

import io.micronaut.context.event.ApplicationEvent;

/**
* Abstract base class for all kafka application events.
*
* @author Jorge F. Sanchez
* @since 5.1.0
* @param <T> the {@link ApplicationEvent} source object type
*/
public class AbstractKafkaApplicationEvent<T> extends ApplicationEvent {
/**
* Constructs a prototypical Event.
*
* @param source The object on which the Event initially occurred.
guillermocalvo marked this conversation as resolved.
Show resolved Hide resolved
* @throws IllegalArgumentException if source is null.
*/
public AbstractKafkaApplicationEvent(T source) {
super(source);
}

@Override
public T getSource() {
return (T) super.getSource();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2017-2023 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.event;

import org.apache.kafka.clients.consumer.Consumer;

/**
* An event fired after a Kafka {@link Consumer} executes the first polling.
*
* @author Jorge F. Sanchez
* @since 5.1.0
*/
public final class KafkaConsumerStartedPollingEvent extends AbstractKafkaApplicationEvent<Consumer> {
/**
* Constructs an event with a given Consumer source.
*
* @param consumer The Consumer on which the Event initially occurred.
* @throws IllegalArgumentException if source is null.
*/
public KafkaConsumerStartedPollingEvent(Consumer consumer) {
super(consumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2017-2023 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.event;

import org.apache.kafka.clients.consumer.Consumer;

/**
* An event fired after a Kafka {@link Consumer} subscribes to a set of Kafka topics.
*
* @author Jorge F. Sanchez
* @since 5.1.0
*/
public final class KafkaConsumerSubscribedEvent extends AbstractKafkaApplicationEvent<Consumer> {
/**
* Constructs an event with a given Consumer source.
*
* @param consumer The Consumer on which the Event initially occurred.
* @throws IllegalArgumentException if source is null.
*/
public KafkaConsumerSubscribedEvent(Consumer consumer) {
super(consumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import io.micronaut.configuration.kafka.config.AbstractKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.configuration.kafka.event.KafkaConsumerStartedPollingEvent;
import io.micronaut.configuration.kafka.event.KafkaConsumerSubscribedEvent;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Blocking;
Expand Down Expand Up @@ -147,6 +150,8 @@ class KafkaConsumerProcessor
private final TransactionalProducerRegistry transactionalProducerRegistry;
private final BatchConsumerRecordsBinderRegistry batchBinderRegistry;
private final AtomicInteger clientIdGenerator = new AtomicInteger(10);
private final ApplicationEventPublisher<KafkaConsumerStartedPollingEvent> kafkaConsumerStartedPollingEventPublisher;
private final ApplicationEventPublisher<KafkaConsumerSubscribedEvent> kafkaConsumerSubscribedEventPublisher;

/**
* Creates a new processor using the given {@link ExecutorService} to schedule consumers on.
Expand All @@ -162,6 +167,8 @@ class KafkaConsumerProcessor
* @param exceptionHandler The exception handler to use
* @param schedulerService The scheduler service
* @param transactionalProducerRegistry The transactional producer registry
* @param startedEventPublisher The KafkaConsumerStartedPollingEvent publisher
* @param subscribedEventPublisher The KafkaConsumerSubscribedEvent publisher
*/
KafkaConsumerProcessor(
@Named(TaskExecutors.MESSAGE_CONSUMER) ExecutorService executorService,
Expand All @@ -174,7 +181,9 @@ class KafkaConsumerProcessor
ProducerRegistry producerRegistry,
KafkaListenerExceptionHandler exceptionHandler,
@Named(TaskExecutors.SCHEDULED) ExecutorService schedulerService,
TransactionalProducerRegistry transactionalProducerRegistry) {
TransactionalProducerRegistry transactionalProducerRegistry,
ApplicationEventPublisher<KafkaConsumerStartedPollingEvent> startedEventPublisher,
ApplicationEventPublisher<KafkaConsumerSubscribedEvent> subscribedEventPublisher) {
this.executorService = executorService;
this.applicationConfiguration = applicationConfiguration;
this.beanContext = beanContext;
Expand All @@ -186,6 +195,8 @@ class KafkaConsumerProcessor
this.exceptionHandler = exceptionHandler;
this.taskScheduler = new ScheduledExecutorTaskScheduler(schedulerService);
this.transactionalProducerRegistry = transactionalProducerRegistry;
this.kafkaConsumerStartedPollingEventPublisher = startedEventPublisher;
this.kafkaConsumerSubscribedEventPublisher = subscribedEventPublisher;
this.beanContext.getBeanDefinitions(Qualifiers.byType(KafkaListener.class))
.forEach(definition -> {
// pre-initialize singletons before processing
Expand Down Expand Up @@ -421,6 +432,7 @@ private void submitConsumerThread(final ExecutableMethod<?, ?> method,
ca.setKafkaConsumer(kafkaConsumer);
}
setupConsumerSubscription(method, topicAnnotations, consumerBean, kafkaConsumer);
kafkaConsumerSubscribedEventPublisher.publishEvent(new KafkaConsumerSubscribedEvent(kafkaConsumer));
ConsumerState consumerState = new ConsumerState(clientId, groupId, offsetStrategy, kafkaConsumer, consumerBean, Collections.unmodifiableSet(kafkaConsumer.subscription()), consumerAnnotation, method);
consumers.put(clientId, consumerState);
executorService.submit(() -> createConsumerThreadPollLoop(method, consumerState));
Expand All @@ -446,6 +458,7 @@ private void createConsumerThreadPollLoop(final ExecutableMethod<?, ?> method,
consumerArg.ifPresent(argument -> boundArguments.put(argument, kafkaConsumer));

//noinspection InfiniteLoopStatement
boolean pollingStarted = false;
while (true) {
final Set<TopicPartition> newAssignments = Collections.unmodifiableSet(kafkaConsumer.assignment());
if (LOG.isInfoEnabled() && !newAssignments.equals(consumerState.assignments)) {
Expand Down Expand Up @@ -482,6 +495,11 @@ private void createConsumerThreadPollLoop(final ExecutableMethod<?, ?> method,
consumerRecords = kafkaConsumer.poll(pollTimeout);
}
consumerState.closedState = ConsumerCloseState.POLLING;
if (!pollingStarted) {
pollingStarted = true;
kafkaConsumerStartedPollingEventPublisher.publishEvent(new KafkaConsumerStartedPollingEvent(kafkaConsumer));
}

failed = true;
consumerState.resumeTopicPartitions();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.micronaut.configuration.kafka.event

import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.context.event.ApplicationEventListener
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import jakarta.inject.Singleton
import org.apache.kafka.clients.consumer.KafkaConsumer
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

import static org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchStates.FETCHING

@Property(name = "spec.name", value = "KafkaConsumerEventSpec")
@MicronautTest(startApplication = false)
class KafkaConsumerEventSpec extends Specification {

@Inject
KafkaConsumerSubscribedEventListener subscribedEventListener

@Inject
KafkaConsumerStartedPollingEventListener startedPollingEvent

void "listen to kafka consumer subscribed events"() {
expect: "the event is emitted and consumed by the event listener"
new PollingConditions(timeout: 10, delay: 1).eventually {
subscribedEventListener.received instanceof KafkaConsumerSubscribedEvent
}
and: "the kafka consumer is subscribed to the expected topic"
subscribedEventListener.consumer.subscriptions.subscription == ['my-nifty-topic'] as Set
}

void "listen to kafka consumer started polling events"() {
expect: "the event is emitted and consumed by the event listener"
new PollingConditions(timeout: 10, delay: 1).eventually {
startedPollingEvent.received instanceof KafkaConsumerStartedPollingEvent
}
and: "the kafka consumer starts fetching records"
new PollingConditions(timeout: 10, delay: 1).eventually {
subscribedEventListener.consumer.subscriptions.assignment.partitionStateValues()[0].fetchState == FETCHING
}
}

@KafkaListener(clientId = "my-nifty-kafka-consumer")
@Requires(property = "spec.name", value = "KafkaConsumerEventSpec")
static class MyKafkaConsumer {
@Topic("my-nifty-topic")
void consume(String event) {}
}

static class AbstractKafkaConsumerEventListener<T extends AbstractKafkaApplicationEvent> implements ApplicationEventListener<T> {
AbstractKafkaApplicationEvent received
KafkaConsumer consumer

@Override
void onApplicationEvent(T event) {
// Skip consumers unrelated to this test
if ((event.source as KafkaConsumer).clientId.startsWith('my-nifty-kafka-consumer')) {
if (received != null) throw new RuntimeException("Expecting one event only")
received = event
consumer = event.source
}
}
}

@Singleton
@Requires(property = "spec.name", value = "KafkaConsumerEventSpec")
static class KafkaConsumerSubscribedEventListener extends AbstractKafkaConsumerEventListener<KafkaConsumerSubscribedEvent> { }

@Singleton
@Requires(property = "spec.name", value = "KafkaConsumerEventSpec")
static class KafkaConsumerStartedPollingEventListener extends AbstractKafkaConsumerEventListener<KafkaConsumerStartedPollingEvent> { }
}
Loading