Skip to content

Commit

Permalink
Support for ConsumerRecords container type on batch processing
Browse files Browse the repository at this point in the history
Bind arguments of ConsumerRecords type.

Consider ConsumerRecords type when looking for the "body argument" to configure the correct key and value deserializers.
  • Loading branch information
rorueda authored Jan 12, 2024
1 parent 5081698 commit eb56c23
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public BatchConsumerRecordsBinderRegistry(ConsumerRecordBinderRegistry consumerR
@Override
public <T> Optional<ArgumentBinder<T, ConsumerRecords<?, ?>>> findArgumentBinder(Argument<T> argument) {
Class<T> argType = argument.getType();

if (ConsumerRecords.class.equals(argType)) {
return Optional.of((context, consumerRecords) -> () -> (Optional<T>) Optional.of(consumerRecords));
}

if (Iterable.class.isAssignableFrom(argType) || argType.isArray() || Publishers.isConvertibleToPublisher(argType)) {
Argument<?> batchType = argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT);
List bound = new ArrayList();
Expand All @@ -74,7 +79,6 @@ public BatchConsumerRecordsBinderRegistry(ConsumerRecordBinderRegistry consumerR
if (result.isPresentAndSatisfied()) {
bound.add(result.get());
}

});
}
return () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ private static Optional<ConsumerRebalanceListener> getConsumerRebalanceListener(

private static Argument<?> findBodyArgument(ExecutableMethod<?, ?> method) {
return Arrays.stream(method.getArguments())
.filter(arg -> arg.getType() == ConsumerRecord.class || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class))
.filter(arg -> isConsumerRecord(arg) || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class))
.findFirst()
.orElseGet(() -> Arrays.stream(method.getArguments())
.filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Bindable.class)
Expand All @@ -528,7 +528,12 @@ private static Argument<?> findBodyArgument(ExecutableMethod<?, ?> method) {

private static Argument<?> findBodyArgument(boolean batch, ExecutableMethod<?, ?> method) {
final Argument<?> tempBodyArg = findBodyArgument(method);
return batch && tempBodyArg != null ? getComponentType(tempBodyArg) : tempBodyArg;

if (batch && tempBodyArg != null) {
return isConsumerRecord(tempBodyArg) ? tempBodyArg : getComponentType(tempBodyArg);
}

return tempBodyArg;
}

private static boolean isLastArgumentOfSuspendedMethod(Argument<?> argument, ExecutableMethod<?, ?> method) {
Expand Down Expand Up @@ -578,11 +583,13 @@ private void configureValueDeserializer(Argument<?> bodyArgument, DefaultKafkaCo
}

private static boolean isConsumerRecord(@NonNull Argument<?> body) {
return ConsumerRecord.class.isAssignableFrom(body.getType());
return ConsumerRecord.class.isAssignableFrom(body.getType()) ||
ConsumerRecords.class.isAssignableFrom(body.getType());
}

private static Argument<?> getComponentType(final Argument<?> argument) {
final Class<?> argumentType = argument.getType();

return argumentType.isArray()
? Argument.of(argumentType.getComponentType())
: argument.getFirstTypeVariable().orElse(argument);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.messaging.annotation.SendTo
import io.micronaut.serde.annotation.Serdeable
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import reactor.core.publisher.Flux
import spock.lang.Retry

Expand All @@ -20,7 +21,8 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {

public static final String BOOKS_TOPIC = 'KafkaBatchListenerSpec-books'
public static final String BOOKS_LIST_TOPIC = 'KafkaBatchListenerSpec-books-list'
public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-records'
public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-record-list'
public static final String BOOK_CONSUMER_RECORDS_TOPIC = 'KafkaBatchListenerSpec-consumer-records'
public static final String BOOKS_HEADERS_TOPIC = 'KafkaBatchListenerSpec-books-headers'
public static final String BOOKS_FLUX_TOPIC = 'KafkaBatchListenerSpec-books-flux'
public static final String BOOKS_FORWARD_LIST_TOPIC = 'KafkaBatchListenerSpec-books-forward-list'
Expand Down Expand Up @@ -169,16 +171,34 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
}
}

void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecord"() {
void "test keys and values deserialized to the correct type when receiving a batch with a list of ConsumerRecord"() {
given:
MyBatchClient myBatchClient = context.getBean(MyBatchClient)
BookListener bookListener = context.getBean(BookListener)
bookListener.books?.clear()
bookListener.keys?.clear()

when:
myBatchClient.sendToReceiveAsConsumerRecord("book-1", new Book(title: "The Flowable"))
myBatchClient.sendToReceiveAsConsumerRecord("book-2", new Book(title: "The Shining"))
myBatchClient.sendToReceiveAsListOfConsumerRecord("book-1", new Book(title: "The Flowable"))
myBatchClient.sendToReceiveAsListOfConsumerRecord("book-2", new Book(title: "The Shining"))

then:
conditions.eventually {
bookListener.books == [new Book(title: "The Flowable"), new Book(title: "The Shining")]
bookListener.keys == ["book-1", "book-2"]
}
}

void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecords"() {
given:
MyBatchClient myBatchClient = context.getBean(MyBatchClient)
BookListener bookListener = context.getBean(BookListener)
bookListener.books?.clear()
bookListener.keys?.clear()

when:
myBatchClient.sendToReceiveAsConsumerRecords("book-1", new Book(title: "The Flowable"))
myBatchClient.sendToReceiveAsConsumerRecords("book-2", new Book(title: "The Shining"))

then:
conditions.eventually {
Expand Down Expand Up @@ -215,7 +235,10 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
void sendBooksFlux(Flux<Book> books)

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC)
void sendToReceiveAsConsumerRecord(@KafkaKey String key, @MessageBody Book book)
void sendToReceiveAsListOfConsumerRecord(@KafkaKey String key, @MessageBody Book book)

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORDS_TOPIC)
void sendToReceiveAsConsumerRecords(@KafkaKey String key, @MessageBody Book book)

}

Expand Down Expand Up @@ -270,9 +293,15 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
}

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC)
void receiveConsumerRecords(List<ConsumerRecord<String, Book>> books) {
this.keys.addAll(books.collect { it.key() })
this.books.addAll(books.collect { it.value() })
void receiveListOfConsumerRecord(List<ConsumerRecord<String, Book>> consumerRecords) {
this.keys.addAll(consumerRecords.collect { it.key() })
this.books.addAll(consumerRecords.collect { it.value() })
}

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORDS_TOPIC)
void receiveConsumerRecords(ConsumerRecords<String, Book> consumerRecords) {
this.keys.addAll(consumerRecords.collect { it.key() })
this.books.addAll(consumerRecords.collect { it.value() })
}
}

Expand Down
17 changes: 17 additions & 0 deletions src/main/docs/guide/kafkaListener/kafkaListenerBatch.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ snippet::io.micronaut.kafka.docs.consumer.batch.manual.BookListener[tags=method,

This example is fairly trivial in that it commits offsets after processing each record in a batch, but you can for example commit after processing every 10, or every 100 or whatever makes sense for your application.

== Receiving a ConsumerRecords

When batching you can receive the entire `ConsumerRecords` object being listened for. In this case you should specify appropriate generic types for the key and value of the `ConsumerRecords` so that Micronaut can pick the correct deserializer for each.

This is useful when the need is to process or commit the records by partition, as the `ConsumerRecords` object already groups records by partition:

.Commit only once for each partition

snippet::io.micronaut.kafka.docs.consumer.batch.manual.BookListener[tags=consumerRecords, indent=0]

<1> Committing offsets automatically is disabled
<2> The method receives the batch of books as a `ConsumerRecords` holder object
<3> Each partition is iterated over
<4> Each record for the partition is processed
<5> The last read offset for the partition is stored
<6> The offset is committed once for each partition

== Reactive Batch Processing

Batch listeners also support defining reactive types (Reactor `Flux` or RxJava rx:Flowable[]) as the method argument.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.consumer.batch.Book
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition

Expand Down Expand Up @@ -38,4 +39,27 @@ class BookListener {
}
}
// end::method[]

// tag::consumerRecords[]
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // <1>
@Topic("all-the-books")
void receiveConsumerRecords(ConsumerRecords<String, Book> consumerRecords, Consumer kafkaConsumer) { // <2>
for (TopicPartition partition : consumerRecords.partitions()) { // <3>
long offset = Long.MIN_VALUE;
// process partition records
for (ConsumerRecord<String, Book> record : consumerRecords.records(partition)) { // <4>
// process the book
Book book = record.value();
// keep last offset
offset = record.offset(); // <5>
}

// commit partition offset
kafkaConsumer.commitSync(Collections.singletonMap( // <6>
partition,
new OffsetAndMetadata(offset + 1, "my metadata")
));
}
}
// end::consumerRecords[]
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.consumer.batch.Book
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition

Expand Down Expand Up @@ -41,5 +42,33 @@ internal class BookListener {
}
}
// end::method[]

// end::method[]

// end::method[]
// tag::consumerRecords[]
@KafkaListener(offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED, batch = true) // <1>
@Topic("all-the-books")
fun receiveConsumerRecords(consumerRecords: ConsumerRecords<String?, Book?>, kafkaConsumer: Consumer<*, *>) { // <2>
for (partition in consumerRecords.partitions()) { // <3>
var offset = Long.MIN_VALUE
// process partition records
for (record in consumerRecords.records(partition)) { // <4>
// process the book
val book = record.value()
// keep last offset
offset = record.offset() // <5>
}

// commit partition offset
kafkaConsumer.commitSync(
Collections.singletonMap( // <6>
partition,
OffsetAndMetadata(offset + 1, "my metadata")
)
)
}
}
// end::consumerRecords[]
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.micronaut.kafka.docs.consumer.batch.Book;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

Expand Down Expand Up @@ -42,4 +43,27 @@ public void receive(List<ConsumerRecord<String, Book>> records, Consumer kafkaCo
}
}
// end::method[]

// tag::consumerRecords[]
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // <1>
@Topic("all-the-books")
public void receiveConsumerRecords(ConsumerRecords<String, Book> consumerRecords, Consumer kafkaConsumer) { // <2>
for (TopicPartition partition : consumerRecords.partitions()) { // <3>
long offset = Long.MIN_VALUE;
// process partition records
for (ConsumerRecord<String, Book> record : consumerRecords.records(partition)) { // <4>
// process the book
Book book = record.value();
// keep last offset
offset = record.offset(); // <5>
}

// commit partition offset
kafkaConsumer.commitSync(Collections.singletonMap( // <6>
partition,
new OffsetAndMetadata(offset + 1, "my metadata")
));
}
}
// end::consumerRecords[]
}

0 comments on commit eb56c23

Please sign in to comment.