Skip to content

Commit

Permalink
Publish events after Consumer subscribes and after first polling (#677)
Browse files Browse the repository at this point in the history
  • Loading branch information
jfsanchez91 authored Aug 4, 2023
1 parent dd59fed commit 1841b6d
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.micronaut.configuration.kafka.streams.event;

import io.micronaut.context.event.ApplicationEvent;
import io.micronaut.configuration.kafka.event.AbstractKafkaApplicationEvent;
import org.apache.kafka.streams.KafkaStreams;

/**
Expand All @@ -24,27 +24,19 @@
* @author graemerocher
* @since 2.0.0
*/
public abstract class AbstractKafkaStreamsEvent extends ApplicationEvent {
private final KafkaStreams kafkaStreams;

public abstract class AbstractKafkaStreamsEvent extends AbstractKafkaApplicationEvent<KafkaStreams> {
/**
* Default constructor.
* @param kafkaStreams The streams
*/
protected AbstractKafkaStreamsEvent(KafkaStreams kafkaStreams) {
super(kafkaStreams);
this.kafkaStreams = kafkaStreams;
}

@Override
public KafkaStreams getSource() {
return (KafkaStreams) super.getSource();
}

/**
* @return The kafka streams object.
*/
public KafkaStreams getKafkaStreams() {
return kafkaStreams;
return getSource();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2017-2023 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.event;

import io.micronaut.context.event.ApplicationEvent;

/**
* Abstract base class for all kafka application events.
*
* @author Jorge F. Sanchez
* @since 5.1.0
* @param <T> the {@link ApplicationEvent} source object type
*/
public class AbstractKafkaApplicationEvent<T> extends ApplicationEvent {
/**
* Constructs a prototypical Event.
*
* @param source The object on which the Event initially occurred.
* @throws IllegalArgumentException if source is null.
*/
public AbstractKafkaApplicationEvent(T source) {
super(source);
}

@Override
public T getSource() {
return (T) super.getSource();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2017-2023 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.event;

import org.apache.kafka.clients.consumer.Consumer;

/**
* An event fired after a Kafka {@link Consumer} executes the first polling.
*
* @author Jorge F. Sanchez
* @since 5.1.0
*/
public final class KafkaConsumerStartedPollingEvent extends AbstractKafkaApplicationEvent<Consumer> {
/**
* Constructs an event with a given Consumer source.
*
* @param consumer The Consumer on which the Event initially occurred.
* @throws IllegalArgumentException if source is null.
*/
public KafkaConsumerStartedPollingEvent(Consumer consumer) {
super(consumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2017-2023 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.event;

import org.apache.kafka.clients.consumer.Consumer;

/**
* An event fired after a Kafka {@link Consumer} subscribes to a set of Kafka topics.
*
* @author Jorge F. Sanchez
* @since 5.1.0
*/
public final class KafkaConsumerSubscribedEvent extends AbstractKafkaApplicationEvent<Consumer> {
/**
* Constructs an event with a given Consumer source.
*
* @param consumer The Consumer on which the Event initially occurred.
* @throws IllegalArgumentException if source is null.
*/
public KafkaConsumerSubscribedEvent(Consumer consumer) {
super(consumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import io.micronaut.configuration.kafka.config.AbstractKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.configuration.kafka.event.KafkaConsumerStartedPollingEvent;
import io.micronaut.configuration.kafka.event.KafkaConsumerSubscribedEvent;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Blocking;
Expand Down Expand Up @@ -147,6 +150,8 @@ class KafkaConsumerProcessor
private final TransactionalProducerRegistry transactionalProducerRegistry;
private final BatchConsumerRecordsBinderRegistry batchBinderRegistry;
private final AtomicInteger clientIdGenerator = new AtomicInteger(10);
private final ApplicationEventPublisher<KafkaConsumerStartedPollingEvent> kafkaConsumerStartedPollingEventPublisher;
private final ApplicationEventPublisher<KafkaConsumerSubscribedEvent> kafkaConsumerSubscribedEventPublisher;

/**
* Creates a new processor using the given {@link ExecutorService} to schedule consumers on.
Expand All @@ -162,6 +167,8 @@ class KafkaConsumerProcessor
* @param exceptionHandler The exception handler to use
* @param schedulerService The scheduler service
* @param transactionalProducerRegistry The transactional producer registry
* @param startedEventPublisher The KafkaConsumerStartedPollingEvent publisher
* @param subscribedEventPublisher The KafkaConsumerSubscribedEvent publisher
*/
KafkaConsumerProcessor(
@Named(TaskExecutors.MESSAGE_CONSUMER) ExecutorService executorService,
Expand All @@ -174,7 +181,9 @@ class KafkaConsumerProcessor
ProducerRegistry producerRegistry,
KafkaListenerExceptionHandler exceptionHandler,
@Named(TaskExecutors.SCHEDULED) ExecutorService schedulerService,
TransactionalProducerRegistry transactionalProducerRegistry) {
TransactionalProducerRegistry transactionalProducerRegistry,
ApplicationEventPublisher<KafkaConsumerStartedPollingEvent> startedEventPublisher,
ApplicationEventPublisher<KafkaConsumerSubscribedEvent> subscribedEventPublisher) {
this.executorService = executorService;
this.applicationConfiguration = applicationConfiguration;
this.beanContext = beanContext;
Expand All @@ -186,6 +195,8 @@ class KafkaConsumerProcessor
this.exceptionHandler = exceptionHandler;
this.taskScheduler = new ScheduledExecutorTaskScheduler(schedulerService);
this.transactionalProducerRegistry = transactionalProducerRegistry;
this.kafkaConsumerStartedPollingEventPublisher = startedEventPublisher;
this.kafkaConsumerSubscribedEventPublisher = subscribedEventPublisher;
this.beanContext.getBeanDefinitions(Qualifiers.byType(KafkaListener.class))
.forEach(definition -> {
// pre-initialize singletons before processing
Expand Down Expand Up @@ -421,6 +432,7 @@ private void submitConsumerThread(final ExecutableMethod<?, ?> method,
ca.setKafkaConsumer(kafkaConsumer);
}
setupConsumerSubscription(method, topicAnnotations, consumerBean, kafkaConsumer);
kafkaConsumerSubscribedEventPublisher.publishEvent(new KafkaConsumerSubscribedEvent(kafkaConsumer));
ConsumerState consumerState = new ConsumerState(clientId, groupId, offsetStrategy, kafkaConsumer, consumerBean, Collections.unmodifiableSet(kafkaConsumer.subscription()), consumerAnnotation, method);
consumers.put(clientId, consumerState);
executorService.submit(() -> createConsumerThreadPollLoop(method, consumerState));
Expand All @@ -446,6 +458,7 @@ private void createConsumerThreadPollLoop(final ExecutableMethod<?, ?> method,
consumerArg.ifPresent(argument -> boundArguments.put(argument, kafkaConsumer));

//noinspection InfiniteLoopStatement
boolean pollingStarted = false;
while (true) {
final Set<TopicPartition> newAssignments = Collections.unmodifiableSet(kafkaConsumer.assignment());
if (LOG.isInfoEnabled() && !newAssignments.equals(consumerState.assignments)) {
Expand Down Expand Up @@ -482,6 +495,11 @@ private void createConsumerThreadPollLoop(final ExecutableMethod<?, ?> method,
consumerRecords = kafkaConsumer.poll(pollTimeout);
}
consumerState.closedState = ConsumerCloseState.POLLING;
if (!pollingStarted) {
pollingStarted = true;
kafkaConsumerStartedPollingEventPublisher.publishEvent(new KafkaConsumerStartedPollingEvent(kafkaConsumer));
}

failed = true;
consumerState.resumeTopicPartitions();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.micronaut.configuration.kafka.event

import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.context.event.ApplicationEventListener
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import jakarta.inject.Singleton
import org.apache.kafka.clients.consumer.KafkaConsumer
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

import static org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchStates.FETCHING

@Property(name = "spec.name", value = "KafkaConsumerEventSpec")
@MicronautTest(startApplication = false)
class KafkaConsumerEventSpec extends Specification {

@Inject
KafkaConsumerSubscribedEventListener subscribedEventListener

@Inject
KafkaConsumerStartedPollingEventListener startedPollingEvent

void "listen to kafka consumer subscribed events"() {
expect: "the event is emitted and consumed by the event listener"
new PollingConditions(timeout: 10, delay: 1).eventually {
subscribedEventListener.received instanceof KafkaConsumerSubscribedEvent
}
and: "the kafka consumer is subscribed to the expected topic"
subscribedEventListener.consumer.subscriptions.subscription == ['my-nifty-topic'] as Set
}

void "listen to kafka consumer started polling events"() {
expect: "the event is emitted and consumed by the event listener"
new PollingConditions(timeout: 10, delay: 1).eventually {
startedPollingEvent.received instanceof KafkaConsumerStartedPollingEvent
}
and: "the kafka consumer starts fetching records"
new PollingConditions(timeout: 10, delay: 1).eventually {
subscribedEventListener.consumer.subscriptions.assignment.partitionStateValues()[0].fetchState == FETCHING
}
}

@KafkaListener(clientId = "my-nifty-kafka-consumer")
@Requires(property = "spec.name", value = "KafkaConsumerEventSpec")
static class MyKafkaConsumer {
@Topic("my-nifty-topic")
void consume(String event) {}
}

static class AbstractKafkaConsumerEventListener<T extends AbstractKafkaApplicationEvent> implements ApplicationEventListener<T> {
AbstractKafkaApplicationEvent received
KafkaConsumer consumer

@Override
void onApplicationEvent(T event) {
// Skip consumers unrelated to this test
if ((event.source as KafkaConsumer).clientId.startsWith('my-nifty-kafka-consumer')) {
if (received != null) throw new RuntimeException("Expecting one event only")
received = event
consumer = event.source
}
}
}

@Singleton
@Requires(property = "spec.name", value = "KafkaConsumerEventSpec")
static class KafkaConsumerSubscribedEventListener extends AbstractKafkaConsumerEventListener<KafkaConsumerSubscribedEvent> { }

@Singleton
@Requires(property = "spec.name", value = "KafkaConsumerEventSpec")
static class KafkaConsumerStartedPollingEventListener extends AbstractKafkaConsumerEventListener<KafkaConsumerStartedPollingEvent> { }
}

0 comments on commit 1841b6d

Please sign in to comment.