Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-language documentation for Forwarding Messages with @SendTo example #827

Merged
merged 12 commits into from
Aug 28, 2023
Merged

This file was deleted.

22 changes: 8 additions & 14 deletions src/main/docs/guide/kafkaListener/kafkaSendTo.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ On any `@KafkaListener` method that returns a value, you can use the https://doc
The key of the original `ConsumerRecord` will be used as the key when forwarding the message.

.Committing offsets with the `KafkaConsumer` API
guillermocalvo marked this conversation as resolved.
Show resolved Hide resolved
[source,java]
----
include::{testskafka}/consumer/sendto/ProductListener.java[tags=imports, indent=0]

include::{testskafka}/consumer/sendto/ProductListener.java[tags=method, indent=0]
----
snippet::io.micronaut.kafka.docs.consumer.sendto.ProductListener[tags=method, indent=0]


<1> The topic subscribed to is `awesome-products`
<2> The topic to send the result to is `product-quantities`
Expand All @@ -17,10 +14,8 @@ include::{testskafka}/consumer/sendto/ProductListener.java[tags=method, indent=0
You can also do the same using Reactive programming:

.Committing offsets with the `KafkaConsumer` API
guillermocalvo marked this conversation as resolved.
Show resolved Hide resolved
[source,java]
----
include::{testskafka}/consumer/sendto/ProductListener.java[tags=reactive, indent=0]
----

snippet::io.micronaut.kafka.docs.consumer.sendto.ProductListener[tags=reactive, indent=0]

<1> The topic subscribed to is `awesome-products`
<2> The topic to send the result to is `product-quantities`
Expand All @@ -31,12 +26,11 @@ In the reactive case the `poll` loop will continue and will not wait for the rec
To enable transactional sending of the messages you need to define `producerTransactionalId` in `@KafkaListener`.

.Transactional consumer-producer
[source,java]
----
include::{testskafka}/consumer/sendto/WordCounter.java[tags=transactional, indent=0]
----

snippet::io.micronaut.kafka.docs.consumer.sendto.WordCounter[tags=transactional, indent=0]


<1> The id of the producer to load additional config properties
<2> The transactional id that is required to enable transactional processing
<3> Enable offset strategy to commit the offsets to the transaction
<4> Consumer read messages isolation
<4> Consumer read messages isolation
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.micronaut.kafka.docs.consumer.sendto

import io.micronaut.serde.annotation.Serdeable

@Serdeable
class Product {

private String name;
private int quantity;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.micronaut.kafka.docs.consumer.sendto

import groovy.util.logging.Slf4j

// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.messaging.annotation.SendTo
import reactor.core.publisher.Mono
// end::imports[]

@Slf4j
@Requires(property = 'spec.name', value = 'ProductListenerTest')
@KafkaListener
wetted marked this conversation as resolved.
Show resolved Hide resolved
class ProductListener {

// tag::method[]
@Topic("awesome-products") // <1>
@SendTo("product-quantities") // <2>
int receive(@KafkaKey String brand, Product product) {
log.info("Got Product - {} by {}", product.name, brand)
product.quantity // <3>
}
// end::method[]

// tag::reactive[]
@Topic("awesome-products") // <1>
@SendTo("product-quantities") // <2>
Mono<Integer> receiveProduct(@KafkaKey String brand, Mono<Product> productSingle) {
productSingle.map(product -> {
log.info("Got Product - {} by {}", product.name, brand)
return product.quantity // <3>
})
}
// end::reactive[]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.micronaut.kafka.docs.consumer.sendto

import io.micronaut.configuration.kafka.KafkaMessage
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.messaging.annotation.SendTo
import org.apache.kafka.common.IsolationLevel

@Requires(property = 'spec.name', value = 'WordCounterTest')
// tag::transactional[]
@KafkaListener(
wetted marked this conversation as resolved.
Show resolved Hide resolved
producerClientId = 'word-counter-producer', // <1>
producerTransactionalId = 'tx-word-counter-id', // <2>
offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // <3>
isolation = IsolationLevel.READ_COMMITTED // <4>
)
class WordCounter {

@Topic('tx-incoming-strings')
@SendTo('my-words-count')
List<KafkaMessage<String, Integer>> wordsCounter(String string) {
string.split("\\s+")
.groupBy()
.collect { key, instanceList ->
KafkaMessage.Builder.withBody(instanceList.size()).key(key).build()
}
}
}
// end::transactional[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.micronaut.kafka.docs.consumer.sendto

import io.micronaut.serde.annotation.Serdeable

@Serdeable
data class Product(val name: String, val quantity: Int)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.micronaut.kafka.docs.consumer.sendto

// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.consumer.batch.BookListener
import io.micronaut.messaging.annotation.SendTo
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import java.util.function.Function
// end::imports[]

@Requires(property = "spec.name", value = "ProductListenerTest")
@KafkaListener
wetted marked this conversation as resolved.
Show resolved Hide resolved
class ProductListener {

companion object {
private val LOG = LoggerFactory.getLogger(BookListener::class.java)
}

// tag::method[]
@Topic("awesome-products") // <1>
@SendTo("product-quantities") // <2>
fun receive(@KafkaKey brand: String?, product: Product): Int {
LOG.info("Got Product - {} by {}", product.name, brand)
return product.quantity // <3>
}
// end::method[]

// tag::reactive[]
@Topic("awesome-products") // <1>
@SendTo("product-quantities") // <2>
fun receiveProduct(@KafkaKey brand: String?, productSingle: Mono<Product>): Mono<Int> {
return productSingle.map(Function<Product, Int> { product: Product ->
LOG.info("Got Product - {} by {}", product.name, brand)
product.quantity // <3>
})
}
// end::reactive[]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.micronaut.kafka.docs.consumer.sendto

import io.micronaut.configuration.kafka.KafkaMessage
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.messaging.annotation.SendTo
import org.apache.kafka.common.IsolationLevel

@Requires(property = "spec.name", value = "WordCounterTest")
// tag::transactional[]
@KafkaListener(
wetted marked this conversation as resolved.
Show resolved Hide resolved
producerClientId = "word-counter-producer",
producerTransactionalId = "tx-word-counter-id",
offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION,
isolation = IsolationLevel.READ_COMMITTED
)
class WordCounter {

@Topic("tx-incoming-strings")
@SendTo("my-words-count")
fun wordsCounter(string: String) = string
.split(Regex("\\s+"))
.groupBy { it }
.map { KafkaMessage.Builder.withBody<String, Int>(it.value.size).key(it.key).build() }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.micronaut.kafka.docs.consumer.sendto;

import io.micronaut.serde.annotation.Serdeable;

@Serdeable
public class Product {

private String name;
private int quantity;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getQuantity() {
return quantity;
}

public void setQuantity(int quantity) {
this.quantity = quantity;
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
package io.micronaut.configuration.kafka.docs.consumer.sendto;
package io.micronaut.kafka.docs.consumer.sendto;

// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.docs.consumer.config.Product;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.consumer.batch.BookListener;
import io.micronaut.messaging.annotation.SendTo;
import org.slf4j.Logger;
import reactor.core.publisher.Mono;

import static org.slf4j.LoggerFactory.getLogger;
// end::imports[]

@Requires(property = "spec.name", value = "ProductListenerTest")
@KafkaListener
wetted marked this conversation as resolved.
Show resolved Hide resolved
public class ProductListener {
private static final Logger LOG = getLogger(BookListener.class);

// tag::method[]
@Topic("awesome-products") // <1>
@SendTo("product-quantities") // <2>
public int receive(@KafkaKey String brand,
Product product) {
System.out.println("Got Product - " + product.getName() + " by " + brand);

public int receive(@KafkaKey String brand, Product product) {
LOG.info("Got Product - {} by {}", product.getName(), brand);
return product.getQuantity(); // <3>
}
// end::method[]
Expand All @@ -30,7 +34,7 @@ public Mono<Integer> receiveProduct(@KafkaKey String brand,
Mono<Product> productSingle) {

return productSingle.map(product -> {
System.out.println("Got Product - " + product.getName() + " by " + brand);
LOG.info("Got Product - {} by {}", product.getName(), brand);
return product.getQuantity(); // <3>
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.micronaut.kafka.docs.consumer.sendto;

import io.micronaut.configuration.kafka.KafkaMessage;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import io.micronaut.messaging.annotation.SendTo;
import org.apache.kafka.common.IsolationLevel;

import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Requires(property = "spec.name", value = "WordCounterTest")
// tag::transactional[]
@KafkaListener(
wetted marked this conversation as resolved.
Show resolved Hide resolved
producerClientId = "word-counter-producer", // <1>
producerTransactionalId = "tx-word-counter-id", // <2>
offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // <3>
isolation = IsolationLevel.READ_COMMITTED // <4>
)
public class WordCounter {

@Topic("tx-incoming-strings")
@SendTo("my-words-count")
List<KafkaMessage<String, Integer>> wordsCounter(String string) {
return Stream.of(string.split("\\s+"))
.collect(Collectors.groupingBy(Function.identity(), Collectors.summingInt(i -> 1)))
.entrySet()
.stream()
.map(e -> KafkaMessage.Builder.<String, Integer>withBody(e.getValue()).key(e.getKey()).build())
.toList();
}
}
// end::transactional[]