Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't use stream() from within spawn? #642

Closed
mindreader opened this issue Dec 27, 2023 · 1 comment
Closed

Can't use stream() from within spawn? #642

mindreader opened this issue Dec 27, 2023 · 1 comment

Comments

@mindreader
Copy link

mindreader commented Dec 27, 2023

I feel like I must have some deep misunderstanding, but why doesn't this work? I've tried everything I can think of.

use futures::stream::TryStreamExt;
use futures::Future;
use rdkafka::consumer::DefaultConsumerContext;
use rdkafka::consumer::StreamConsumer;
use rdkafka::error::KafkaError;

#[tokio::main]
async fn main() {
    // mainrun().await; // works
    tokio::spawn(mainrun()); // this doesn't work, and I have no idea why.
}

async fn mainrun() -> Result<(), KafkaError> {
    let config = rdkafka::config::ClientConfig::new();
    let x: StreamConsumer = config
        .create::<StreamConsumer<DefaultConsumerContext>>()
        .unwrap();

    x.stream()
        .map_ok(|_msg| async { Ok::<_, KafkaError>(()) })
        .try_buffered(2)
        .try_collect()
        .await
}

Error:

  --> foo.rs:10:5
   |
10 |     tokio::spawn(mainrun()); // this doesn't work, and I have no idea why.
   |     ^^^^^^^^^^^^^^^^^^^^^^^ implementation of `FnOnce` is not general enough
   |
   = note: closure with signature `fn(BorrowedMessage<'0>) -> {async block@foo.rs:20:24: 20:57}` must implement `FnOnce<(BorrowedMessage<'1>,)>`, for any two lifetimes `'0` and `'1`...
   = note: ...but it actually implements `FnOnce<(BorrowedMessage<'_>,)>`

error: implementation of `Stream` is not general enough
  --> foo.rs:10:5
   |
10 |     tokio::spawn(mainrun()); // this doesn't work, and I have no idea why.
   |     ^^^^^^^^^^^^^^^^^^^^^^^ implementation of `Stream` is not general enough
   |
   = note: `Stream` would have to be implemented for the type `MessageStream<'0, DefaultConsumerContext>`, for any lifetime `'0`...
   = note: ...but `Stream` is actually implemented for the type `MessageStream<'1, DefaultConsumerContext>`, for some specific lifetime `'1`

Ultimately I'd like to split a consumer into partition queues that each run on different threads and process them in parallel, but I can't use the stream function if I use buffered in it, even with Arcs unless I start it in the main thread.

@mindreader
Copy link
Author

I finally figured it out, holy moly. The answer was to stop using async sugar, and introduce a lifetime that allows it to know that the consumer or stream partition queue outlive the messages you are building async blocks from.

This is not tested in prod, but might be a good example program if anyone else was interested in something like this.

use futures::stream::TryStreamExt;
use futures::Future;
use rdkafka::consumer::DefaultConsumerContext;
use rdkafka::consumer::StreamConsumer;
use rdkafka::consumer::stream_consumer::StreamPartitionQueue;
use rdkafka::message::BorrowedMessage;
use rdkafka::error::KafkaError;
use std::sync::Arc;
use futures::stream::StreamExt;

#[derive(Clone)]
struct App {
    consumer: Arc<StreamConsumer<DefaultConsumerContext>>,
}

struct PartitionApp {
    queue: StreamPartitionQueue<DefaultConsumerContext>,
}

impl<'a> PartitionApp {
    fn new(queue: StreamPartitionQueue<DefaultConsumerContext>) -> PartitionApp {
        PartitionApp { queue }
    }

    fn run(&'a self) -> impl Future<Output = Result<(), KafkaError>> + 'a {
        self.queue
            .stream()
            .map_ok(|msg: BorrowedMessage<'a>| async move {
                println!("msg: {:?}", msg);
                Ok(())
            }).
            .try_buffered(20)
            .try_collect()
    }
}

impl App {
    fn new() -> App {
        let config = rdkafka::config::ClientConfig::new();
        let consumer: StreamConsumer = config
            .create::<StreamConsumer<DefaultConsumerContext>>()
            .unwrap();
        let consumer = Arc::new(consumer);
        App { consumer }
    }

    fn split(&self, topic: &str, partition: i32) -> PartitionApp {
        let queue = self.consumer.split_partition_queue(topic, partition).unwrap();
        PartitionApp::new(queue)
    }
}

#[tokio::main]
async fn main() {
    let app = App::new();

    let f = |partition: i32| {
        let queue = app.split("topic", partition);
        async move {
            queue.run().await
        }
    };

    let x = tokio::spawn(f(0));
    let y = tokio::spawn(f(1));
    let z = tokio::spawn(f(2));

    let _res = futures::join!(x, y, z);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant