Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to infer kafka key and value deserializer types when batch is enabled #598

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,11 @@ private static Argument<?> findBodyArgument(ExecutableMethod<?, ?> method) {
private void configureDeserializers(final ExecutableMethod<?, ?> method, final DefaultKafkaConsumerConfiguration consumerConfiguration) {
final Properties properties = consumerConfiguration.getConfig();
// figure out the Key deserializer
final Argument<?> bodyArgument = findBodyArgument(method);
boolean batch = method.isTrue(KafkaListener.class, "batch");

Argument<?> tempBodyArg = findBodyArgument(method);

final Argument<?> bodyArgument = batch && tempBodyArg != null ? getComponentType(tempBodyArg) : tempBodyArg;

if (!properties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) && !consumerConfiguration.getKeyDeserializer().isPresent()) {
final Optional<Argument<?>> keyArgument = Arrays.stream(method.getArguments())
Expand Down Expand Up @@ -956,8 +960,7 @@ private void configureDeserializers(final ExecutableMethod<?, ?> method, final D
consumerConfiguration.setValueDeserializer(new StringDeserializer());
}
} else {
final boolean batch = method.isTrue(KafkaListener.class, "batch");
consumerConfiguration.setValueDeserializer(serdeRegistry.pickDeserializer(batch ? getComponentType(bodyArgument) : bodyArgument));
consumerConfiguration.setValueDeserializer(serdeRegistry.pickDeserializer(bodyArgument));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec
import io.micronaut.context.annotation.Requires
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.messaging.annotation.SendTo
import io.micronaut.serde.annotation.Serdeable
import io.reactivex.Flowable
import org.apache.kafka.clients.consumer.ConsumerRecord
import reactor.core.publisher.Flux
import spock.lang.Retry

Expand All @@ -19,6 +21,7 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {

public static final String BOOKS_TOPIC = 'KafkaBatchListenerSpec-books'
public static final String BOOKS_LIST_TOPIC = 'KafkaBatchListenerSpec-books-list'
public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-records'
public static final String BOOKS_HEADERS_TOPIC = 'KafkaBatchListenerSpec-books-headers'
public static final String BOOKS_FLUX_TOPIC = 'KafkaBatchListenerSpec-books-flux'
public static final String BOOKS_FLOWABLE_TOPIC = 'KafkaBatchListenerSpec-books-flowable'
Expand Down Expand Up @@ -204,6 +207,24 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
}
}

void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecord"() {
given:
MyBatchClient myBatchClient = context.getBean(MyBatchClient)
BookListener bookListener = context.getBean(BookListener)
bookListener.books?.clear()
bookListener.keys?.clear()

when:
myBatchClient.sendToReceiveAsConsumerRecord("book-1", new Book(title: "The Flowable"))
myBatchClient.sendToReceiveAsConsumerRecord("book-2", new Book(title: "The Shining"))

then:
conditions.eventually {
bookListener.books == [new Book(title: "The Flowable"), new Book(title: "The Shining")]
bookListener.keys == ["book-1", "book-2"]
}
}

@Requires(property = 'spec.name', value = 'KafkaBatchListenerSpec')
@KafkaClient(batch = true)
@Topic(KafkaBatchListenerSpec.BOOKS_TOPIC)
Expand Down Expand Up @@ -236,13 +257,18 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {

@Topic(KafkaBatchListenerSpec.BOOKS_FLOWABLE_TOPIC)
void sendBooksFlowable(Flowable<Book> books)

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC)
void sendToReceiveAsConsumerRecord(@KafkaKey String key, @MessageBody Book book)

}

@Requires(property = 'spec.name', value = 'KafkaBatchListenerSpec')
@KafkaListener(batch = true, offsetReset = EARLIEST)
@Topic(KafkaBatchListenerSpec.BOOKS_TOPIC)
static class BookListener {
List<Book> books = []
List<String> keys = []
List<String> headers = []

@Topic(KafkaBatchListenerSpec.BOOKS_LIST_TOPIC)
Expand Down Expand Up @@ -298,6 +324,12 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
void receiveFlowable(Flowable<Book> books) {
this.books.addAll books.toList().blockingGet()
}

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC)
void receiveConsumerRecords(List<ConsumerRecord<String, Book>> books) {
this.keys.addAll(books.collect { it.key() })
this.books.addAll(books.collect { it.value() })
}
}

@Requires(property = 'spec.name', value = 'KafkaBatchListenerSpec')
Expand Down