Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 56 additions & 36 deletions examples/asynchronous_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;
use clap::{Arg, Command};
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use log::info;
use log::{error, info, warn};

use rdkafka::config::ClientConfig;
use rdkafka::consumer::stream_consumer::StreamConsumer;
Expand Down Expand Up @@ -58,6 +58,8 @@ async fn run_async_processor(
input_topic: String,
output_topic: String,
) {
// Count how many time we tried to connect to kafka
let mut attempt = 0;
// Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`.
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", &group_id)
Expand All @@ -80,42 +82,57 @@ async fn run_async_processor(
.expect("Producer creation error");

// Create the outer pipeline on the message stream.
let stream_processor = consumer.stream().try_for_each(|borrowed_message| {
let producer = producer.clone();
let output_topic = output_topic.to_string();
async move {
// Process each message
record_borrowed_message_receipt(&borrowed_message).await;
// Borrowed messages can't outlive the consumer they are received from, so they need to
// be owned in order to be sent to a separate thread.
let owned_message = borrowed_message.detach();
record_owned_message_receipt(&owned_message).await;
tokio::spawn(async move {
// The body of this block will be executed on the main thread pool,
// but we perform `expensive_computation` on a separate thread pool
// for CPU-intensive tasks via `tokio::task::spawn_blocking`.
let computation_result =
tokio::task::spawn_blocking(|| expensive_computation(owned_message))
.await
.expect("failed to wait for expensive computation");
let produce_future = producer.send(
FutureRecord::to(&output_topic)
.key("some key")
.payload(&computation_result),
Duration::from_secs(0),
);
match produce_future.await {
Ok(delivery) => println!("Sent: {:?}", delivery),
Err((e, _)) => println!("Error: {:?}", e),
while attempt < 10 {
let stream_processor = consumer.stream().try_for_each(|borrowed_message| {
let producer = producer.clone();
let output_topic = output_topic.to_string();
async move {
// Process each message
record_borrowed_message_receipt(&borrowed_message).await;
// Borrowed messages can't outlive the consumer they are received from, so they need to
// be owned in order to be sent to a separate thread.
let owned_message = borrowed_message.detach();
record_owned_message_receipt(&owned_message).await;
tokio::spawn(async move {
// The body of this block will be executed on the main thread pool,
// but we perform `expensive_computation` on a separate thread pool
// for CPU-intensive tasks via `tokio::task::spawn_blocking`.
let computation_result =
tokio::task::spawn_blocking(|| expensive_computation(owned_message))
.await
.expect("failed to wait for expensive computation");
let produce_future = producer.send(
FutureRecord::to(&output_topic)
.key("some key")
.payload(&computation_result),
Duration::from_secs(0),
);
match produce_future.await {
Ok(delivery) => println!("Sent: {:?}", delivery),
Err((e, _)) => println!("Error: {:?}", e),
}
});
Ok(())
}
});

info!("starting event loop");
let stream_result = stream_processor.await;
match stream_result {
Ok(_) => {
info!("connected");
}
Err(err) => {
if attempt + 1 < 10 {
warn!("connect failed: {:?}; retrying", err);
attempt += 1;
} else {
error!("connect failed after 10 attempts: {:?}", err);
panic!("cannot connect to Kafka");
}
});
Ok(())
}
}
});

info!("Starting event loop");
stream_processor.await.expect("stream processing failed");
info!("Stream processing terminated");
}
}

#[tokio::main]
Expand All @@ -128,7 +145,10 @@ async fn main() {
.short('b')
.long("brokers")
.help("Broker list in kafka format")
.default_value("localhost:9092"),
.default_value("localhost:9092"), // NOTE: Using 'localhost' can be unreliable: it may resolve to IPv6 (::1) or IPv4 (127.0.0.1)
// depending on OS/DNS/gai.conf, and IPv6 may be unreachable in some setups (e.g., Docker or disabled IPv6).
// When running locally, prefer specifying the exact loopback IP to avoid resolution/stack surprises,
// e.g., 127.0.0.1:9092 (IPv4) or [::1]:9092 (IPv6).
)
.arg(
Arg::new("group-id")
Expand Down