Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce KafkaListener.threadsValue to allow for dynamic config #769

Merged
merged 10 commits into from
Aug 4, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,37 @@
ErrorStrategy errorStrategy() default @ErrorStrategy();

/**
* Kafka consumers are by default single threaded. If you wish to increase the number of threads
* Dynamically configure the number of threads of a Kafka consumer.
*
* <p>Kafka consumers are by default single threaded. If you wish to increase the number of threads
* for a consumer you can alter this setting. Note that this means that multiple partitions will
* be allocated to a single application.
*
* <p>NOTE: When using this setting if your bean is {@link jakarta.inject.Singleton} then local state will be s
* <p>NOTE: When using this setting if your bean is {@link jakarta.inject.Singleton} then local state will be
* shared between invocations from different consumer threads</p>
*
* <p>{@code threadsValue} takes precedence over {@code threads} if they are both set.
*
* @return The number of threads
* @see KafkaListener#threads()
*/
@AliasFor(member = "threads")
String threadsValue() default "";

/**
* Statically configure the number of threads of a Kafka consumer.
*
* <p>Kafka consumers are by default single threaded. If you wish to increase the number of threads
* for a consumer you can alter this setting. Note that this means that multiple partitions will
* be allocated to a single application.
*
* <p>NOTE: When using this setting if your bean is {@link jakarta.inject.Singleton} then local state will be
* shared between invocations from different consumer threads</p>
*
* <p>{@code threads} will be overridden by {@code threadsValue} if they are both set.
*
* @return The number of threads
* @see KafkaListener#threadsValue()
*/
int threads() default 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import io.micronaut.configuration.kafka.metrics.KafkaProducerMetrics
import io.micronaut.configuration.kafka.serde.JsonObjectSerde
import io.micronaut.configuration.metrics.management.endpoint.MetricsEndpoint
import io.micronaut.context.annotation.Requires
import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import io.micronaut.core.annotation.Nullable
import io.micronaut.http.HttpResponse
import io.micronaut.http.client.DefaultHttpClientConfiguration
Expand All @@ -20,7 +22,9 @@ import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.serde.annotation.Serdeable
import io.reactivex.Single
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
Expand All @@ -42,6 +46,7 @@ class KafkaListenerSpec extends AbstractEmbeddedServerSpec {
super.configuration +
['micrometer.metrics.enabled': true,
'endpoints.metrics.sensitive': false,
'my.thread.count': 3,
(EMBEDDED_TOPICS): ['words', 'books', 'words-records', 'books-records']]
}

Expand Down Expand Up @@ -199,6 +204,21 @@ class KafkaListenerSpec extends AbstractEmbeddedServerSpec {
}
}

void "test kafka consumer with configurable number of threads"() {
expect:
context.getBean(MyConsumer6).kafkaConsumers.size() == 3
}

void "test kafka consumer with fixed number of threads"() {
expect:
context.getBean(MyConsumer7).kafkaConsumers.size() == 2
}

void "test kafka consumer with both thread settings set"() {
expect:
context.getBean(MyConsumer8).kafkaConsumers.size() == 3
}

@Requires(property = 'spec.name', value = 'KafkaListenerSpec')
@KafkaClient
static interface MyClient {
Expand Down Expand Up @@ -282,6 +302,72 @@ class KafkaListenerSpec extends AbstractEmbeddedServerSpec {
}
}

@Requires(property = 'spec.name', value = 'KafkaListenerSpec')
@KafkaListener(clientId = "kafka-consumer-with-configurable-number-of-threads", threadsValue = '${my.thread.count}')
jeremyg484 marked this conversation as resolved.
Show resolved Hide resolved
static class MyConsumer6 implements BeanCreatedEventListener<Consumer> {
List<KafkaConsumer> kafkaConsumers = []

@Override
Consumer onCreated(BeanCreatedEvent<Consumer> event) {
if (event.bean instanceof KafkaConsumer) {
final consumer = ((KafkaConsumer) event.bean)
if (consumer.clientId.startsWith("kafka-consumer-with-configurable-number-of-threads")) {
kafkaConsumers << consumer
}
}
return event.bean
}

@Topic("words")
void consume(String sentence) {
// Do nothing
}
}

@Requires(property = 'spec.name', value = 'KafkaListenerSpec')
@KafkaListener(clientId = "kafka-consumer-with-fixed-number-of-threads", threads = 2)
static class MyConsumer7 implements BeanCreatedEventListener<Consumer> {
List<KafkaConsumer> kafkaConsumers = []

@Override
Consumer onCreated(BeanCreatedEvent<Consumer> event) {
if (event.bean instanceof KafkaConsumer) {
final consumer = ((KafkaConsumer) event.bean)
if (consumer.clientId.startsWith("kafka-consumer-with-fixed-number-of-threads")) {
kafkaConsumers << consumer
}
}
return event.bean
}

@Topic("words")
void consume(String sentence) {
// Do nothing
}
}

@Requires(property = 'spec.name', value = 'KafkaListenerSpec')
@KafkaListener(clientId = "kafka-consumer-with-both-thread-settings-set", threads = 10, threadsValue = '${my.thread.count}')
static class MyConsumer8 implements BeanCreatedEventListener<Consumer> {
List<KafkaConsumer> kafkaConsumers = []

@Override
Consumer onCreated(BeanCreatedEvent<Consumer> event) {
if (event.bean instanceof KafkaConsumer) {
final consumer = ((KafkaConsumer) event.bean)
if (consumer.clientId.startsWith("kafka-consumer-with-both-thread-settings-set")) {
kafkaConsumers << consumer
}
}
return event.bean
}

@Topic("words")
void consume(String sentence) {
// Do nothing
}
}

@Requires(property = 'spec.name', value = 'KafkaListenerSpec')
@KafkaListener(offsetReset = EARLIEST)
static class PojoConsumer2 {
Expand Down
10 changes: 10 additions & 0 deletions src/main/docs/guide/kafkaListener.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ The above example will create 10 link:{kafkaapi}/org/apache/kafka/clients/consum

NOTE: @KafkaListener beans are by default singleton. When using multiple threads you must either synchronize access to local state or declare the bean as `@Prototype`.

You can also make your number of threads configurable by using `threadsValue`:

.Dynamically Configuring Threads
[source,java]
----
@KafkaListener(groupId="myGroup", threadsValue = "${my.thread.count}")
----

NOTE: `threads` will be overridden by `threadsValue` if they are both set.

By default Micronaut will inspect the method signature of the method annotated with `@Topic` that will listen for `ConsumerRecord` instances and from the types infer an appropriate key and value link:{kafkaapi}/org/apache/kafka/common/serialization/Deserializer.html[Deserializer].

.Applying Configuration
Expand Down
Loading