-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Can't use stream() from within spawn? #642
Comments
I finally figured it out, holy moly. The answer was to stop using async sugar, and introduce a lifetime that allows it to know that the consumer or stream partition queue outlive the messages you are building async blocks from. This is not tested in prod, but might be a good example program if anyone else was interested in something like this. use futures::stream::TryStreamExt;
use futures::Future;
use rdkafka::consumer::DefaultConsumerContext;
use rdkafka::consumer::StreamConsumer;
use rdkafka::consumer::stream_consumer::StreamPartitionQueue;
use rdkafka::message::BorrowedMessage;
use rdkafka::error::KafkaError;
use std::sync::Arc;
use futures::stream::StreamExt;
#[derive(Clone)]
struct App {
consumer: Arc<StreamConsumer<DefaultConsumerContext>>,
}
struct PartitionApp {
queue: StreamPartitionQueue<DefaultConsumerContext>,
}
impl<'a> PartitionApp {
fn new(queue: StreamPartitionQueue<DefaultConsumerContext>) -> PartitionApp {
PartitionApp { queue }
}
fn run(&'a self) -> impl Future<Output = Result<(), KafkaError>> + 'a {
self.queue
.stream()
.map_ok(|msg: BorrowedMessage<'a>| async move {
println!("msg: {:?}", msg);
Ok(())
}).
.try_buffered(20)
.try_collect()
}
}
impl App {
fn new() -> App {
let config = rdkafka::config::ClientConfig::new();
let consumer: StreamConsumer = config
.create::<StreamConsumer<DefaultConsumerContext>>()
.unwrap();
let consumer = Arc::new(consumer);
App { consumer }
}
fn split(&self, topic: &str, partition: i32) -> PartitionApp {
let queue = self.consumer.split_partition_queue(topic, partition).unwrap();
PartitionApp::new(queue)
}
}
#[tokio::main]
async fn main() {
let app = App::new();
let f = |partition: i32| {
let queue = app.split("topic", partition);
async move {
queue.run().await
}
};
let x = tokio::spawn(f(0));
let y = tokio::spawn(f(1));
let z = tokio::spawn(f(2));
let _res = futures::join!(x, y, z);
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I feel like I must have some deep misunderstanding, but why doesn't this work? I've tried everything I can think of.
Error:
Ultimately I'd like to split a consumer into partition queues that each run on different threads and process them in parallel, but I can't use the stream function if I use buffered in it, even with Arcs unless I start it in the main thread.
The text was updated successfully, but these errors were encountered: