Skip to content

Streams can prevent any other futures from being scheduled again #869

Closed
@sdroege

Description

@sdroege

This is copied over from tokio-rs/tokio#207
Please see code below for a contrived example of the problem, and a workaround.

The problem here is that a single stream that continues to produce items will block a whole thread of the executor forever, instead of allowing other scheduled futures to be handled after an item was produced. Thus basically causing starvation. This happens because Stream::for_each is basically an infinite loop as long as items can be produced, going out of the loop with NotReady after each item (how to wake up the future afterwards best?) would solve this problem.

In practice this can cause e.g. a couple of fast TCP connections that are handled with for_each to starve any slower TCP connections, or in my specific use case of a Stream around an UdpSocket it allows any one of the sockets to completely occupy a thread (as long as packets only arrive fast enough) and prevent any other sockets with slower packet rate to be ever scheduled again. Note that fast/slow here is relative, and related to the processing time of each stream item and how fast new items arrive.

Is this expected behaviour and one is expected to implement a custom "scheduler" around e.g. Stream::into_future to do round-robin scheduling of all "equal" streams?

extern crate futures;
extern crate tokio;
extern crate tokio_reactor;

use futures::stream;
use futures::{Future, Stream};
use tokio::executor::thread_pool;
use tokio::reactor;

fn main() {
    let reactor = reactor::Reactor::new().unwrap().background().unwrap();

    let handle = reactor.handle().clone();

    let mut pool_builder = thread_pool::Builder::new();
    pool_builder.around_worker(move |w, enter| {
        ::tokio_reactor::with_default(&handle, enter, |_| {
            w.run();
        });
    });

    // Important to have 1 thread here, otherwise
    // both streams would just block two threads
    // forever.
    pool_builder.pool_size(1);
    let pool = pool_builder.build();

    pool.spawn(stream::repeat(1).for_each(|n| {
        println!("{}", n);

        Ok(())
    }));

    pool.spawn(stream::repeat(2).for_each(|n| {
        println!("{}", n);

        Ok(())
    }));

    pool.shutdown_on_idle()
        .and_then(|_| reactor.shutdown_on_idle())
        .wait()
        .unwrap();
}

A workaround for this would be the following, see the YieldOnce Future below. While that works around the problem, the default behaviour here seems like a potential footgun that people will only notice once it's too late.

extern crate futures;
extern crate tokio;
extern crate tokio_reactor;

use futures::stream;
use futures::task;
use futures::{Future, Stream, Poll, Async};
use tokio::executor::thread_pool;
use tokio::reactor;

struct YieldOnce(Option<()>);

impl YieldOnce {
    fn new() -> Self {
        YieldOnce(None)
    }
}

impl Future for YieldOnce {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<(), ()> {
        if let Some(_) = self.0.take() {
            Ok(Async::Ready(()))
        } else {
            self.0 = Some(());
            task::current().notify();
            Ok(Async::NotReady)
        }
    }
}

fn main() {
    let reactor = reactor::Reactor::new().unwrap().background().unwrap();

    let handle = reactor.handle().clone();

    let mut pool_builder = thread_pool::Builder::new();
    pool_builder.around_worker(move |w, enter| {
        ::tokio_reactor::with_default(&handle, enter, |_| {
            w.run();
        });
    });

    // Important to have 1 thread here, otherwise
    // both streams would just block two threads
    // forever.
    pool_builder.pool_size(1);
    let pool = pool_builder.build();

    pool.spawn(stream::repeat(1).for_each(|n| {
        println!("{}", n);

        task::current().notify();

        YieldOnce::new()
    }));

    pool.spawn(stream::repeat(2).for_each(|n| {
        println!("{}", n);

        task::current().notify();

        YieldOnce::new()
    }));

    pool.shutdown_on_idle()
        .and_then(|_| reactor.shutdown_on_idle())
        .wait()
        .unwrap();
}

@carllerche said in the tokio ticket "IMO, the combinators should handle yielding internally."

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions