diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/sendto/ProductListener.java b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/sendto/ProductListener.java deleted file mode 100644 index 62e37d452..000000000 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/sendto/ProductListener.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.micronaut.configuration.kafka.docs.consumer.sendto; - -// tag::imports[] -import io.micronaut.configuration.kafka.annotation.KafkaKey; -import io.micronaut.configuration.kafka.annotation.KafkaListener; -import io.micronaut.configuration.kafka.annotation.Topic; -import io.micronaut.configuration.kafka.docs.consumer.config.Product; -import io.micronaut.messaging.annotation.SendTo; -import reactor.core.publisher.Mono; -// end::imports[] - -@KafkaListener -public class ProductListener { - - // tag::method[] - @Topic("awesome-products") // <1> - @SendTo("product-quantities") // <2> - public int receive(@KafkaKey String brand, - Product product) { - System.out.println("Got Product - " + product.getName() + " by " + brand); - - return product.getQuantity(); // <3> - } - // end::method[] - - // tag::reactive[] - @Topic("awesome-products") // <1> - @SendTo("product-quantities") // <2> - public Mono receiveProduct(@KafkaKey String brand, - Mono productSingle) { - - return productSingle.map(product -> { - System.out.println("Got Product - " + product.getName() + " by " + brand); - return product.getQuantity(); // <3> - }); - } - // end::reactive[] -} diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/sendto/WordCounter.java b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/sendto/WordCounter.java deleted file mode 100644 index 5b9cf639f..000000000 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/sendto/WordCounter.java +++ /dev/null @@ -1,39 +0,0 @@ -package io.micronaut.configuration.kafka.docs.consumer.sendto; - -import io.micronaut.configuration.kafka.KafkaMessage; -import io.micronaut.configuration.kafka.annotation.KafkaListener; -import io.micronaut.configuration.kafka.annotation.OffsetStrategy; -import io.micronaut.configuration.kafka.annotation.Topic; -import io.micronaut.messaging.annotation.SendTo; -import org.apache.kafka.common.IsolationLevel; - -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -// tag::transactional[] -@KafkaListener(producerClientId = "word-counter-producer", // <1> - producerTransactionalId = "tx-word-counter-id", // <2> - offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // <3> - isolation = IsolationLevel.READ_COMMITTED // <4> -) -public class WordCounter { - - @Topic("tx-incoming-strings") - @SendTo("my-words-count") - List wordsCounter(String string) { - Map wordsCount = Stream.of(string.split(" ")) - .map(word -> new AbstractMap.SimpleEntry<>(word, 1)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Integer::sum)); - List messages = new ArrayList<>(); - for (Map.Entry e : wordsCount.entrySet()) { - messages.add(KafkaMessage.Builder.withBody(e.getValue()).key(e.getKey()).build()); - } - return messages; - } - -} -// end::transactional[] diff --git a/src/main/docs/guide/kafkaListener/kafkaSendTo.adoc b/src/main/docs/guide/kafkaListener/kafkaSendTo.adoc index df7f7a04b..0b7309a5b 100644 --- a/src/main/docs/guide/kafkaListener/kafkaSendTo.adoc +++ b/src/main/docs/guide/kafkaListener/kafkaSendTo.adoc @@ -2,13 +2,10 @@ On any `@KafkaListener` method that returns a value, you can use the https://doc The key of the original `ConsumerRecord` will be used as the key when forwarding the message. -.Committing offsets with the `KafkaConsumer` API -[source,java] ----- -include::{testskafka}/consumer/sendto/ProductListener.java[tags=imports, indent=0] +.Forwarding with @SendTo + +snippet::io.micronaut.kafka.docs.consumer.sendto.ProductListener[tags=method, indent=0] -include::{testskafka}/consumer/sendto/ProductListener.java[tags=method, indent=0] ----- <1> The topic subscribed to is `awesome-products` <2> The topic to send the result to is `product-quantities` @@ -16,11 +13,9 @@ include::{testskafka}/consumer/sendto/ProductListener.java[tags=method, indent=0 You can also do the same using Reactive programming: -.Committing offsets with the `KafkaConsumer` API -[source,java] ----- -include::{testskafka}/consumer/sendto/ProductListener.java[tags=reactive, indent=0] ----- +.Forwarding Reactively with @SendTo + +snippet::io.micronaut.kafka.docs.consumer.sendto.ProductListener[tags=reactive, indent=0] <1> The topic subscribed to is `awesome-products` <2> The topic to send the result to is `product-quantities` @@ -31,12 +26,11 @@ In the reactive case the `poll` loop will continue and will not wait for the rec To enable transactional sending of the messages you need to define `producerTransactionalId` in `@KafkaListener`. .Transactional consumer-producer -[source,java] ----- -include::{testskafka}/consumer/sendto/WordCounter.java[tags=transactional, indent=0] ----- + +snippet::io.micronaut.kafka.docs.consumer.sendto.WordCounter[tags=transactional, indent=0] + <1> The id of the producer to load additional config properties <2> The transactional id that is required to enable transactional processing <3> Enable offset strategy to commit the offsets to the transaction -<4> Consumer read messages isolation \ No newline at end of file +<4> Consumer read messages isolation diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/ProductClient.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/ProductClient.groovy new file mode 100644 index 000000000..0d0466eae --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/ProductClient.groovy @@ -0,0 +1,15 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.kafka.docs.Product; + +@Requires(property = 'spec.name', value = 'SendToProductListenerTest') +@KafkaClient('product-client') +interface ProductClient { + + @Topic('sendto-products') + void send(@KafkaKey String brand, Product product) +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/ProductListener.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/ProductListener.groovy new file mode 100644 index 000000000..d194bdbbd --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/ProductListener.groovy @@ -0,0 +1,40 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import groovy.util.logging.Slf4j + +// tag::imports[] +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.OffsetReset +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.kafka.docs.Product +import io.micronaut.messaging.annotation.SendTo +import reactor.core.publisher.Mono +// end::imports[] + +@Slf4j +@Requires(property = 'spec.name', value = 'SendToProductListenerTest') +@KafkaListener(offsetReset = OffsetReset.EARLIEST) +class ProductListener { + + // tag::method[] + @Topic("sendto-products") // <1> + @SendTo("product-quantities") // <2> + int receive(@KafkaKey String brand, Product product) { + log.info("Got Product - {} by {}", product.name, brand) + product.quantity // <3> + } + // end::method[] + + // tag::reactive[] + @Topic("sendto-products") // <1> + @SendTo("product-quantities") // <2> + Mono receiveProduct(@KafkaKey String brand, Mono productSingle) { + productSingle.map(product -> { + log.info("Got Product - {} by {}", product.name, brand) + product.quantity // <3> + }) + } + // end::reactive[] +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/QuantityListener.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/QuantityListener.groovy new file mode 100644 index 000000000..fc27cef75 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/QuantityListener.groovy @@ -0,0 +1,22 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import groovy.util.logging.Slf4j +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.OffsetReset +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires + +@Slf4j +@Requires(property = 'spec.name', value = 'SendToProductListenerTest') +@KafkaListener(offsetReset = OffsetReset.EARLIEST) +class QuantityListener { + + Integer quantity + + @Topic("product-quantities") + int receive(@KafkaKey String brand, Integer quantity) { + log.info("Got Quantity - {} by {}", quantity, brand) + this.quantity = quantity + } +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/SendToProductListenerTest.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/SendToProductListenerTest.groovy new file mode 100644 index 000000000..e26a09e5d --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/SendToProductListenerTest.groovy @@ -0,0 +1,26 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.context.ApplicationContext +import io.micronaut.kafka.docs.Product +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +class SendToProductListenerTest extends Specification { + void "test Send Product"() { + given: + ApplicationContext ctx = ApplicationContext.run( + 'kafka.enabled': true, 'spec.name': 'SendToProductListenerTest' + ) + + when: + Product product = new Product("Blue Trainers", 5) + ProductClient client = ctx.getBean(ProductClient.class) + client.send("Nike", product) + QuantityListener listener = ctx.getBean(QuantityListener) + + then: + new PollingConditions(timeout: 10).eventually { + listener.quantity == 5 + } + } +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/WordCountListener.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/WordCountListener.groovy new file mode 100644 index 000000000..5f78672e8 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/WordCountListener.groovy @@ -0,0 +1,27 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.OffsetReset +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import org.slf4j.Logger + +import static org.slf4j.LoggerFactory.getLogger + +@Requires(property = 'spec.name', value = 'WordCounterTest') +@KafkaListener(offsetReset = OffsetReset.EARLIEST) +class WordCountListener { + + private static final Logger LOG = getLogger(WordCountListener.class) + + Map wordCount = [:] + + @Topic("my-words-count") + void receive(@KafkaKey byte[] key, Object value) { + final String word = new String(key) + final Integer count = (Integer) value + LOG.info("Got word count - {}: {}", word, count) + this.wordCount.put(word, count) + } +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/WordCounter.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/WordCounter.groovy new file mode 100644 index 000000000..3ca29019f --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/WordCounter.groovy @@ -0,0 +1,33 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.configuration.kafka.KafkaMessage +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.OffsetReset +import io.micronaut.configuration.kafka.annotation.OffsetStrategy +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.messaging.annotation.SendTo +import org.apache.kafka.common.IsolationLevel + +@Requires(property = 'spec.name', value = 'WordCounterTest') +// tag::transactional[] +@KafkaListener( + offsetReset = OffsetReset.EARLIEST, + producerClientId = 'word-counter-producer', // <1> + producerTransactionalId = 'tx-word-counter-id', // <2> + offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // <3> + isolation = IsolationLevel.READ_COMMITTED // <4> +) +class WordCounter { + + @Topic('tx-incoming-strings') + @SendTo('my-words-count') + List> wordsCounter(String string) { + string.split("\\s+") + .groupBy() + .collect { key, instanceList -> + KafkaMessage.Builder.withBody(instanceList.size()).key(key.bytes).build() + } + } +} +// end::transactional[] diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/WordCounterClient.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/WordCounterClient.groovy new file mode 100644 index 000000000..ba55aafe9 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/WordCounterClient.groovy @@ -0,0 +1,13 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires + +@Requires(property = 'spec.name', value = 'WordCounterTest') +@KafkaClient('word-counter-producer') +interface WordCounterClient { + + @Topic('tx-incoming-strings') + void send(String words) +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/WordCounterTest.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/WordCounterTest.groovy new file mode 100644 index 000000000..45266b0ef --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/sendto/WordCounterTest.groovy @@ -0,0 +1,31 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.context.ApplicationContext +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +import static java.util.concurrent.TimeUnit.SECONDS + +class WordCounterTest extends Specification { + + void "test Word Counter"() { + given: + ApplicationContext ctx = ApplicationContext.run( + 'kafka.enabled': true, 'spec.name': 'WordCounterTest' + ) + + when: + WordCounterClient client = ctx.getBean(WordCounterClient.class) + client.send('test to test for words') + + then: + WordCountListener listener = ctx.getBean(WordCountListener.class) + new PollingConditions(timeout: 10).eventually { + listener.wordCount.size() == 4 + listener.wordCount.get("test") == 2 + listener.wordCount.get("to") == 1 + listener.wordCount.get("for") == 1 + listener.wordCount.get("words") == 1 + } + } +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/ProductClient.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/ProductClient.kt new file mode 100644 index 000000000..4ba6c7b12 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/ProductClient.kt @@ -0,0 +1,15 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.kafka.docs.Product + +@Requires(property = "spec.name", value = "SendToProductListenerTest") +@KafkaClient("product-client") +interface ProductClient { + + @Topic("sendto-products") + fun send(@KafkaKey brand: String, product: Product) +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/ProductListener.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/ProductListener.kt new file mode 100644 index 000000000..c0c92b3d5 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/ProductListener.kt @@ -0,0 +1,44 @@ +package io.micronaut.kafka.docs.consumer.sendto + +// tag::imports[] +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.OffsetReset +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.kafka.docs.Product +import io.micronaut.kafka.docs.consumer.batch.BookListener +import io.micronaut.messaging.annotation.SendTo +import org.slf4j.LoggerFactory +import reactor.core.publisher.Mono +import java.util.function.Function +// end::imports[] + +@Requires(property = "spec.name", value = "SendToProductListenerTest") +@KafkaListener(offsetReset = OffsetReset.EARLIEST) +class ProductListener { + + companion object { + private val LOG = LoggerFactory.getLogger(BookListener::class.java) + } + + // tag::method[] + @Topic("sendto-products") // <1> + @SendTo("product-quantities") // <2> + fun receive(@KafkaKey brand: String?, product: Product): Int { + LOG.info("Got Product - {} by {}", product.name, brand) + return product.quantity // <3> + } + // end::method[] + + // tag::reactive[] + @Topic("sendto-products") // <1> + @SendTo("product-quantities") // <2> + fun receiveProduct(@KafkaKey brand: String?, productSingle: Mono): Mono { + return productSingle.map(Function { product: Product -> + LOG.info("Got Product - {} by {}", product.name, brand) + product.quantity // <3> + }) + } +// end::reactive[] +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/QuantityListener.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/QuantityListener.kt new file mode 100644 index 000000000..c74baf9f6 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/QuantityListener.kt @@ -0,0 +1,30 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.OffsetReset +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.kafka.docs.Product +import io.micronaut.kafka.docs.consumer.batch.BookListener +import io.micronaut.messaging.annotation.SendTo +import org.slf4j.LoggerFactory +import reactor.core.publisher.Mono +import java.util.function.Function + +@Requires(property = "spec.name", value = "SendToProductListenerTest") +@KafkaListener(offsetReset = OffsetReset.EARLIEST) +class QuantityListener { + + companion object { + private val LOG = LoggerFactory.getLogger(BookListener::class.java) + } + + var quantity : Integer? = null + + @Topic("product-quantities") + fun receive(@KafkaKey brand: String?, quantity: Integer) { + LOG.info("Got Quantity - {} by {}", quantity, brand) + this.quantity = quantity + } +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/SendToProductListenerTest.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/SendToProductListenerTest.kt new file mode 100644 index 000000000..76d3541a1 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/SendToProductListenerTest.kt @@ -0,0 +1,27 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.context.ApplicationContext +import io.micronaut.kafka.docs.Product +import org.awaitility.Awaitility +import org.junit.jupiter.api.Test +import java.util.Map +import java.util.concurrent.TimeUnit + +class SendToProductListenerTest { + + @Test + fun testSendProduct() { + ApplicationContext.run( + Map.of("kafka.enabled", "true", "spec.name", "SendToProductListenerTest") + ).use { ctx -> + val product = Product("Blue Trainers", 5) + val client = ctx.getBean(ProductClient::class.java) + client.send("Nike", product) + val listener = ctx.getBean(QuantityListener::class.java) + Awaitility.await().atMost(10, TimeUnit.SECONDS).until { + listener.quantity != null && + listener.quantity!!.toInt() == 5 + } + } + } +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/WordCountListener.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/WordCountListener.kt new file mode 100644 index 000000000..4eeb26c0b --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/WordCountListener.kt @@ -0,0 +1,23 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.configuration.kafka.annotation.* +import io.micronaut.context.annotation.Requires +import org.apache.kafka.common.IsolationLevel +import org.slf4j.LoggerFactory + +@Requires(property = "spec.name", value = "WordCounterTest") +@KafkaListener(offsetReset = OffsetReset.EARLIEST) +class WordCountListener { + + private val LOG = LoggerFactory.getLogger(WordCountListener::class.java) + + var wordCount: MutableMap = HashMap() + + @Topic("my-words-count") + fun receive(@KafkaKey key: ByteArray?, value: Any) { + val word = String(key!!) + val count = value as Int + LOG.info("Got word count - {}: {}", word, count) + wordCount[word] = count + } +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/WordCounter.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/WordCounter.kt new file mode 100644 index 000000000..b3ae02271 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/WordCounter.kt @@ -0,0 +1,30 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.configuration.kafka.KafkaMessage +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.OffsetReset +import io.micronaut.configuration.kafka.annotation.OffsetStrategy +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.messaging.annotation.SendTo +import org.apache.kafka.common.IsolationLevel + +@Requires(property = "spec.name", value = "WordCounterTest") +// tag::transactional[] +@KafkaListener( + offsetReset = OffsetReset.EARLIEST, + producerClientId = "word-counter-producer", // <1> + producerTransactionalId = "tx-word-counter-id", // <2> + offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // <3> + isolation = IsolationLevel.READ_COMMITTED // <4> +) +class WordCounter { + + @Topic("tx-incoming-strings") + @SendTo("my-words-count") + fun wordsCounter(string: String) = string + .split(Regex("\\s+")) + .groupBy { it } + .map { KafkaMessage.Builder.withBody(it.value.size).key(it.key.toByteArray()).build() } +} +// end::transactional[] diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/WordCounterClient.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/WordCounterClient.kt new file mode 100644 index 000000000..b754eb9a3 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/WordCounterClient.kt @@ -0,0 +1,14 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.configuration.kafka.KafkaMessage +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires + +@Requires(property = "spec.name", value = "WordCounterTest") +@KafkaClient("word-counter-producer") +interface WordCounterClient { + + @Topic("tx-incoming-strings") + fun send(words: String): List> +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/WordCounterTest.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/WordCounterTest.kt new file mode 100644 index 000000000..a57269204 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/sendto/WordCounterTest.kt @@ -0,0 +1,28 @@ +package io.micronaut.kafka.docs.consumer.sendto + +import io.micronaut.context.ApplicationContext +import org.awaitility.Awaitility +import org.junit.jupiter.api.Test +import java.util.Map +import java.util.concurrent.TimeUnit + +class WordCounterTest { + + @Test + fun testWordCounter() { + ApplicationContext.run( + Map.of("kafka.enabled", "true", "spec.name", "WordCounterTest") + ).use { ctx -> + val client = ctx.getBean(WordCounterClient::class.java) + client.send("test to test for words") + val listener: WordCountListener = ctx.getBean(WordCountListener::class.java) + Awaitility.await().atMost(10, TimeUnit.SECONDS).until { + listener.wordCount.size == 4 && + listener.wordCount.get("test") == 2 && + listener.wordCount.get("to") == 1 && + listener.wordCount.get("for") == 1 && + listener.wordCount.get("words") == 1 + } + } + } +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/ProductClient.java b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/ProductClient.java new file mode 100644 index 000000000..b49bd7401 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/ProductClient.java @@ -0,0 +1,15 @@ +package io.micronaut.kafka.docs.consumer.sendto; + +import io.micronaut.configuration.kafka.annotation.KafkaClient; +import io.micronaut.configuration.kafka.annotation.KafkaKey; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; +import io.micronaut.kafka.docs.Product; + +@Requires(property = "spec.name", value = "SendToProductListenerTest") +@KafkaClient("product-client") +public interface ProductClient { + + @Topic("sendto-products") + void send(@KafkaKey String brand, Product product); +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/ProductListener.java b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/ProductListener.java new file mode 100644 index 000000000..400cd0482 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/ProductListener.java @@ -0,0 +1,44 @@ +package io.micronaut.kafka.docs.consumer.sendto; + +// tag::imports[] +import io.micronaut.configuration.kafka.annotation.KafkaKey; +import io.micronaut.configuration.kafka.annotation.KafkaListener; +import io.micronaut.configuration.kafka.annotation.OffsetReset; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; +import io.micronaut.kafka.docs.Product; +import io.micronaut.kafka.docs.consumer.batch.BookListener; +import io.micronaut.messaging.annotation.SendTo; +import org.slf4j.Logger; +import reactor.core.publisher.Mono; + +import static org.slf4j.LoggerFactory.getLogger; +// end::imports[] + +@Requires(property = "spec.name", value = "SendToProductListenerTest") +@KafkaListener(offsetReset = OffsetReset.EARLIEST) +public class ProductListener { + private static final Logger LOG = getLogger(BookListener.class); + + // tag::method[] + @Topic("sendto-products") // <1> + @SendTo("product-quantities") // <2> + public int receive(@KafkaKey String brand, Product product) { + LOG.info("Got Product - {} by {}", product.name(), brand); + return product.quantity(); // <3> + } + // end::method[] + + // tag::reactive[] + @Topic("sendto-products") // <1> + @SendTo("product-quantities") // <2> + public Mono receiveProduct(@KafkaKey String brand, + Mono productSingle) { + + return productSingle.map(product -> { + LOG.info("Got Product - {} by {}", product.name(), brand); + return product.quantity(); // <3> + }); + } + // end::reactive[] +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/QuantityListener.java b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/QuantityListener.java new file mode 100644 index 000000000..3c1de7fe3 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/QuantityListener.java @@ -0,0 +1,25 @@ +package io.micronaut.kafka.docs.consumer.sendto; + +import io.micronaut.configuration.kafka.annotation.KafkaKey; +import io.micronaut.configuration.kafka.annotation.KafkaListener; +import io.micronaut.configuration.kafka.annotation.OffsetReset; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; + +@Requires(property = "spec.name", value = "SendToProductListenerTest") +@KafkaListener(offsetReset = OffsetReset.EARLIEST) +public class QuantityListener { + + private static final Logger LOG = getLogger(QuantityListener.class); + + Integer quantity; + + @Topic("product-quantities") + public void receive(@KafkaKey String brand, Integer quantity) { + LOG.info("Got quantity - {} by {}", quantity, brand); + this.quantity = quantity; + } +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/SendToProductListenerTest.java b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/SendToProductListenerTest.java new file mode 100644 index 000000000..37d6c7829 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/SendToProductListenerTest.java @@ -0,0 +1,29 @@ +package io.micronaut.kafka.docs.consumer.sendto; + +import io.micronaut.context.ApplicationContext; +import io.micronaut.kafka.docs.Product; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; + +class SendToProductListenerTest { + + @Test + void testSendProduct() { + try (ApplicationContext ctx = ApplicationContext.run( + Map.of("kafka.enabled", "true", "spec.name", "SendToProductListenerTest") + )) { + Product product = new Product("Blue Trainers", 5); + ProductClient client = ctx.getBean(ProductClient.class); + client.send("Nike", product); + QuantityListener listener = ctx.getBean(QuantityListener.class); + await().atMost(10, SECONDS).until(() -> + listener.quantity != null && + listener.quantity == 5 + ); + } + } +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/WordCountListener.java b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/WordCountListener.java new file mode 100644 index 000000000..636335062 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/WordCountListener.java @@ -0,0 +1,30 @@ +package io.micronaut.kafka.docs.consumer.sendto; + +import io.micronaut.configuration.kafka.annotation.KafkaKey; +import io.micronaut.configuration.kafka.annotation.KafkaListener; +import io.micronaut.configuration.kafka.annotation.OffsetReset; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; +import org.slf4j.Logger; + +import java.util.HashMap; +import java.util.Map; + +import static org.slf4j.LoggerFactory.getLogger; + +@Requires(property = "spec.name", value = "WordCounterTest") +@KafkaListener(offsetReset = OffsetReset.EARLIEST) +public class WordCountListener { + + private static final Logger LOG = getLogger(WordCountListener.class); + + Map wordCount = new HashMap<>(); + + @Topic("my-words-count") + public void receive(@KafkaKey byte[] key, Object value) { + final String word = new String(key); + final Integer count = (Integer) value; + LOG.info("Got word count - {}: {}", word, count); + this.wordCount.put(word, count); + } +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/WordCounter.java b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/WordCounter.java new file mode 100644 index 000000000..cb8b65814 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/WordCounter.java @@ -0,0 +1,39 @@ +package io.micronaut.kafka.docs.consumer.sendto; + +import io.micronaut.configuration.kafka.KafkaMessage; +import io.micronaut.configuration.kafka.annotation.KafkaListener; +import io.micronaut.configuration.kafka.annotation.OffsetReset; +import io.micronaut.configuration.kafka.annotation.OffsetStrategy; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; +import io.micronaut.messaging.annotation.SendTo; +import org.apache.kafka.common.IsolationLevel; + +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Requires(property = "spec.name", value = "WordCounterTest") +// tag::transactional[] +@KafkaListener( + offsetReset = OffsetReset.EARLIEST, + producerClientId = "word-counter-producer", // <1> + producerTransactionalId = "tx-word-counter-id", // <2> + offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // <3> + isolation = IsolationLevel.READ_COMMITTED // <4> +) +public class WordCounter { + + @Topic("tx-incoming-strings") + @SendTo("my-words-count") + List> wordsCounter(String string) { + return Stream.of(string.split("\\s+")) + .collect(Collectors.groupingBy(Function.identity(), Collectors.summingInt(i -> 1))) + .entrySet() + .stream() + .map(e -> KafkaMessage.Builder.withBody(e.getValue()).key(e.getKey().getBytes()).build()) + .toList(); + } +} +// end::transactional[] diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/WordCounterClient.java b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/WordCounterClient.java new file mode 100644 index 000000000..aa1fbecaf --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/WordCounterClient.java @@ -0,0 +1,13 @@ +package io.micronaut.kafka.docs.consumer.sendto; + +import io.micronaut.configuration.kafka.annotation.KafkaClient; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; + +@Requires(property = "spec.name", value = "WordCounterTest") +@KafkaClient("word-counter-producer") +public interface WordCounterClient { + + @Topic("tx-incoming-strings") + void send(String words); +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/WordCounterTest.java b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/WordCounterTest.java new file mode 100644 index 000000000..07cbf097f --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/sendto/WordCounterTest.java @@ -0,0 +1,31 @@ +package io.micronaut.kafka.docs.consumer.sendto; + +import io.micronaut.context.ApplicationContext; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +class WordCounterTest { + + @Test + void testWordCounter() { + try (ApplicationContext ctx = ApplicationContext.run( + Map.of("kafka.enabled", "true", "spec.name", "WordCounterTest") + )) { + WordCounterClient client = ctx.getBean(WordCounterClient.class); + client.send("test to test for words"); + WordCountListener listener = ctx.getBean(WordCountListener.class); + await().atMost(10, SECONDS).until(() -> + listener.wordCount.size() == 4 && + listener.wordCount.get("test") == 2 && + listener.wordCount.get("to") == 1 && + listener.wordCount.get("for") == 1 && + listener.wordCount.get("words") == 1 + ); + } + } +} diff --git a/test-suite/src/test/resources/logback.xml b/test-suite/src/test/resources/logback.xml index 31ce8cd88..eed6dc507 100644 --- a/test-suite/src/test/resources/logback.xml +++ b/test-suite/src/test/resources/logback.xml @@ -15,4 +15,5 @@ +