Skip to content

Commit

Permalink
doc: Multi-language documentation for Forwarding Messages with @sendto
Browse files Browse the repository at this point in the history
…example (#827)
  • Loading branch information
wetted authored Aug 28, 2023
1 parent e75ed8e commit 7141fdc
Show file tree
Hide file tree
Showing 28 changed files with 655 additions and 93 deletions.

This file was deleted.

This file was deleted.

26 changes: 10 additions & 16 deletions src/main/docs/guide/kafkaListener/kafkaSendTo.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,20 @@ On any `@KafkaListener` method that returns a value, you can use the https://doc

The key of the original `ConsumerRecord` will be used as the key when forwarding the message.

.Committing offsets with the `KafkaConsumer` API
[source,java]
----
include::{testskafka}/consumer/sendto/ProductListener.java[tags=imports, indent=0]
.Forwarding with @SendTo

snippet::io.micronaut.kafka.docs.consumer.sendto.ProductListener[tags=method, indent=0]

include::{testskafka}/consumer/sendto/ProductListener.java[tags=method, indent=0]
----

<1> The topic subscribed to is `awesome-products`
<2> The topic to send the result to is `product-quantities`
<3> The return value is used to indicate the value to forward

You can also do the same using Reactive programming:

.Committing offsets with the `KafkaConsumer` API
[source,java]
----
include::{testskafka}/consumer/sendto/ProductListener.java[tags=reactive, indent=0]
----
.Forwarding Reactively with @SendTo

snippet::io.micronaut.kafka.docs.consumer.sendto.ProductListener[tags=reactive, indent=0]

<1> The topic subscribed to is `awesome-products`
<2> The topic to send the result to is `product-quantities`
Expand All @@ -31,12 +26,11 @@ In the reactive case the `poll` loop will continue and will not wait for the rec
To enable transactional sending of the messages you need to define `producerTransactionalId` in `@KafkaListener`.

.Transactional consumer-producer
[source,java]
----
include::{testskafka}/consumer/sendto/WordCounter.java[tags=transactional, indent=0]
----

snippet::io.micronaut.kafka.docs.consumer.sendto.WordCounter[tags=transactional, indent=0]


<1> The id of the producer to load additional config properties
<2> The transactional id that is required to enable transactional processing
<3> Enable offset strategy to commit the offsets to the transaction
<4> Consumer read messages isolation
<4> Consumer read messages isolation
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.micronaut.kafka.docs.consumer.sendto

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product;

@Requires(property = 'spec.name', value = 'SendToProductListenerTest')
@KafkaClient('product-client')
interface ProductClient {

@Topic('sendto-products')
void send(@KafkaKey String brand, Product product)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.micronaut.kafka.docs.consumer.sendto

import groovy.util.logging.Slf4j

// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import io.micronaut.messaging.annotation.SendTo
import reactor.core.publisher.Mono
// end::imports[]

@Slf4j
@Requires(property = 'spec.name', value = 'SendToProductListenerTest')
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class ProductListener {

// tag::method[]
@Topic("sendto-products") // <1>
@SendTo("product-quantities") // <2>
int receive(@KafkaKey String brand, Product product) {
log.info("Got Product - {} by {}", product.name, brand)
product.quantity // <3>
}
// end::method[]

// tag::reactive[]
@Topic("sendto-products") // <1>
@SendTo("product-quantities") // <2>
Mono<Integer> receiveProduct(@KafkaKey String brand, Mono<Product> productSingle) {
productSingle.map(product -> {
log.info("Got Product - {} by {}", product.name, brand)
product.quantity // <3>
})
}
// end::reactive[]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.micronaut.kafka.docs.consumer.sendto

import groovy.util.logging.Slf4j
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires

@Slf4j
@Requires(property = 'spec.name', value = 'SendToProductListenerTest')
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class QuantityListener {

Integer quantity

@Topic("product-quantities")
int receive(@KafkaKey String brand, Integer quantity) {
log.info("Got Quantity - {} by {}", quantity, brand)
this.quantity = quantity
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.micronaut.kafka.docs.consumer.sendto

import io.micronaut.context.ApplicationContext
import io.micronaut.kafka.docs.Product
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

class SendToProductListenerTest extends Specification {
void "test Send Product"() {
given:
ApplicationContext ctx = ApplicationContext.run(
'kafka.enabled': true, 'spec.name': 'SendToProductListenerTest'
)

when:
Product product = new Product("Blue Trainers", 5)
ProductClient client = ctx.getBean(ProductClient.class)
client.send("Nike", product)
QuantityListener listener = ctx.getBean(QuantityListener)

then:
new PollingConditions(timeout: 10).eventually {
listener.quantity == 5
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.micronaut.kafka.docs.consumer.sendto

import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import org.slf4j.Logger

import static org.slf4j.LoggerFactory.getLogger

@Requires(property = 'spec.name', value = 'WordCounterTest')
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class WordCountListener {

private static final Logger LOG = getLogger(WordCountListener.class)

Map<String, Integer> wordCount = [:]

@Topic("my-words-count")
void receive(@KafkaKey byte[] key, Object value) {
final String word = new String(key)
final Integer count = (Integer) value
LOG.info("Got word count - {}: {}", word, count)
this.wordCount.put(word, count)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.micronaut.kafka.docs.consumer.sendto

import io.micronaut.configuration.kafka.KafkaMessage
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.messaging.annotation.SendTo
import org.apache.kafka.common.IsolationLevel

@Requires(property = 'spec.name', value = 'WordCounterTest')
// tag::transactional[]
@KafkaListener(
offsetReset = OffsetReset.EARLIEST,
producerClientId = 'word-counter-producer', // <1>
producerTransactionalId = 'tx-word-counter-id', // <2>
offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // <3>
isolation = IsolationLevel.READ_COMMITTED // <4>
)
class WordCounter {

@Topic('tx-incoming-strings')
@SendTo('my-words-count')
List<KafkaMessage<byte[], Integer>> wordsCounter(String string) {
string.split("\\s+")
.groupBy()
.collect { key, instanceList ->
KafkaMessage.Builder.withBody(instanceList.size()).key(key.bytes).build()
}
}
}
// end::transactional[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.micronaut.kafka.docs.consumer.sendto

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires

@Requires(property = 'spec.name', value = 'WordCounterTest')
@KafkaClient('word-counter-producer')
interface WordCounterClient {

@Topic('tx-incoming-strings')
void send(String words)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.micronaut.kafka.docs.consumer.sendto

import io.micronaut.context.ApplicationContext
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

import static java.util.concurrent.TimeUnit.SECONDS

class WordCounterTest extends Specification {

void "test Word Counter"() {
given:
ApplicationContext ctx = ApplicationContext.run(
'kafka.enabled': true, 'spec.name': 'WordCounterTest'
)

when:
WordCounterClient client = ctx.getBean(WordCounterClient.class)
client.send('test to test for words')

then:
WordCountListener listener = ctx.getBean(WordCountListener.class)
new PollingConditions(timeout: 10).eventually {
listener.wordCount.size() == 4
listener.wordCount.get("test") == 2
listener.wordCount.get("to") == 1
listener.wordCount.get("for") == 1
listener.wordCount.get("words") == 1
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.micronaut.kafka.docs.consumer.sendto

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product

@Requires(property = "spec.name", value = "SendToProductListenerTest")
@KafkaClient("product-client")
interface ProductClient {

@Topic("sendto-products")
fun send(@KafkaKey brand: String, product: Product)
}
Loading

0 comments on commit 7141fdc

Please sign in to comment.