Skip to content

ForEach, Fold, and similar stream combinators can run saturated without returning from poll #1957

Open
@mzabaluev

Description

@mzabaluev

The combinators that poll a stream for exhaustion in a loop have a problem that's already been raised in #869: if the upstream consecutively returns Ready for a long time, the loop never breaks and the combinator's poll never returns for that long, starving other pending operations in the task from being polled.

To illustrate how this can be a problem for other code, consider this simple adapter for making futures cancellable:

use futures::channel::oneshot::{self, Canceled};
use futures::prelude::*;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::ready;
use pin_utils::unsafe_pinned;

struct CancelHandle(oneshot::Sender<()>);

#[derive(Debug)]
struct AlreadyDropped;

impl CancelHandle {
    fn cancel(self) -> Result<(), AlreadyDropped> {
        self.0.send(()).map_err(|()| AlreadyDropped)
    }
}

struct Cancelable<F> {
    op: F,
    stop_rx: oneshot::Receiver<()>,
}

impl<F> Cancelable<F> {
    unsafe_pinned!(op: F);
    unsafe_pinned!(stop_rx: oneshot::Receiver<()>);
}

impl<F: Unpin> Unpin for Cancelable<F> {}

impl<F> Future for Cancelable<F>
where
    F: Future,
{
    type Output = Result<F::Output, Canceled>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.as_mut().stop_rx().poll(cx) {
            Poll::Pending => {
                let output = ready!(self.as_mut().op().poll(cx));
                Ok(output).into()
            }
            Poll::Ready(_res) => Err(Canceled).into(),
        }
    }
}

fn make_cancelable<F>(op: F) -> (Cancelable<F>, CancelHandle) {
    let (stop_tx, stop_rx) = oneshot::channel();
    let fut = Cancelable { op, stop_rx };
    let handle = CancelHandle(stop_tx);
    (fut, handle)
}

It looks rather useful and intuitive, but this contrived example hangs with a busy-looping thread rather than canceling the task:

fn main() {
    let mut a = 0;
    let (fut, stop_handle) = make_cancelable(
        stream::repeat(1).for_each(move |n| {
            a += n;
            future::ready(())
        })
    );
    let mut executor = ThreadPool::new().unwrap();
    let res_handle = executor.spawn_with_handle(fut).unwrap();
    thread::sleep(Duration::from_millis(1));
    stop_handle.cancel().unwrap();
    let res = executor.run(res_handle);
    assert!(res.is_err());
}

In non-contrived usage with real streams, too, a ForEach with an always-ready processing closure will delay cancellation for as long as the stream yields items.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions