Skip to content

Commit dc159c9

Browse files
committed
Add for_each_concurrent and try_for_each_concurrent
1 parent 8892e5c commit dc159c9

File tree

4 files changed

+333
-0
lines changed

4 files changed

+333
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use crate::stream::{FuturesUnordered, StreamExt};
2+
use core::marker::Unpin;
3+
use core::mem::PinMut;
4+
use core::num::NonZeroUsize;
5+
use futures_core::future::Future;
6+
use futures_core::stream::Stream;
7+
use futures_core::task::{self, Poll};
8+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
9+
10+
/// A stream combinator which executes a unit closure over each item on a
11+
/// stream concurrently.
12+
///
13+
/// This structure is returned by the
14+
/// [`StreamExt::for_each_concurrent`](super::StreamExt::for_each_concurrent)
15+
/// method.
16+
#[derive(Debug)]
17+
#[must_use = "streams do nothing unless polled"]
18+
pub struct ForEachConcurrent<St, Fut, F> {
19+
stream: Option<St>,
20+
f: F,
21+
futures: FuturesUnordered<Fut>,
22+
limit: Option<NonZeroUsize>,
23+
}
24+
25+
impl<St, Fut, F> Unpin for ForEachConcurrent<St, Fut, F>
26+
where St: Unpin,
27+
Fut: Unpin,
28+
{}
29+
30+
impl<St, Fut, F> ForEachConcurrent<St, Fut, F>
31+
where St: Stream,
32+
F: FnMut(St::Item) -> Fut,
33+
Fut: Future<Output = ()>,
34+
{
35+
unsafe_pinned!(stream: Option<St>);
36+
unsafe_unpinned!(f: F);
37+
unsafe_unpinned!(futures: FuturesUnordered<Fut>);
38+
unsafe_unpinned!(limit: Option<NonZeroUsize>);
39+
40+
pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> ForEachConcurrent<St, Fut, F> {
41+
ForEachConcurrent {
42+
stream: Some(stream),
43+
// Note: `limit` = 0 gets ignored.
44+
limit: limit.and_then(NonZeroUsize::new),
45+
f,
46+
futures: FuturesUnordered::new(),
47+
}
48+
}
49+
}
50+
51+
impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F>
52+
where St: Stream,
53+
F: FnMut(St::Item) -> Fut,
54+
Fut: Future<Output = ()>,
55+
{
56+
type Output = ();
57+
58+
fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<()> {
59+
loop {
60+
let mut made_progress_this_iter = false;
61+
62+
// Try and pull an item from the stream
63+
let current_len = self.futures().len();
64+
// Check if we've already created a number of futures greater than `limit`
65+
if self.limit().map(|limit| limit.get() > current_len).unwrap_or(true) {
66+
let mut stream_completed = false;
67+
let elem = if let Some(stream) = self.stream().as_pin_mut() {
68+
match stream.poll_next(cx) {
69+
Poll::Ready(Some(elem)) => {
70+
made_progress_this_iter = true;
71+
Some(elem)
72+
},
73+
Poll::Ready(None) => {
74+
stream_completed = true;
75+
None
76+
}
77+
Poll::Pending => None,
78+
}
79+
} else {
80+
None
81+
};
82+
if stream_completed {
83+
PinMut::set(self.stream(), None);
84+
}
85+
if let Some(elem) = elem {
86+
let next_future = (self.f())(elem);
87+
self.futures().push(next_future);
88+
}
89+
}
90+
91+
match self.futures().poll_next_unpin(cx) {
92+
Poll::Ready(Some(())) => made_progress_this_iter = true,
93+
Poll::Ready(None) => {
94+
if self.stream().is_none() {
95+
return Poll::Ready(())
96+
}
97+
},
98+
Poll::Pending => {}
99+
}
100+
101+
if !made_progress_this_iter {
102+
return Poll::Pending;
103+
}
104+
}
105+
}
106+
}

futures-util/src/stream/mod.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ if_std! {
111111
mod collect;
112112
pub use self::collect::Collect;
113113

114+
mod for_each_concurrent;
115+
pub use self::for_each_concurrent::ForEachConcurrent;
116+
114117
mod futures_ordered;
115118
pub use self::futures_ordered::{futures_ordered, FuturesOrdered};
116119

@@ -552,6 +555,65 @@ pub trait StreamExt: Stream {
552555
ForEach::new(self, f)
553556
}
554557

558+
/// Runs this stream to completion, executing the provided asynchronous
559+
/// closure for each element on the stream concurrently as elements become
560+
/// available.
561+
///
562+
/// This is similar to [`StreamExt::for_each`], but the futures
563+
/// produced by the closure are run concurrently (but not in parallel--
564+
/// this combinator does not introduce any threads).
565+
///
566+
/// The closure provided will be called for each item this stream produces,
567+
/// yielding a future. That future will then be executed to completion
568+
/// concurrently with the other futures produced by the closure.
569+
///
570+
/// The first argument is an optional limit on the number of concurrent
571+
/// futures. If this limit is not `None`, no more than `limit` futures
572+
/// will be run concurrently. The `limit` argument is of type
573+
/// `Into<Option<usize>>`, and so can be provided as either `None`,
574+
/// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
575+
/// no limit at all, and will have the same result as passing in `None`.
576+
///
577+
/// This method is only available when the `std` feature of this
578+
/// library is activated, and it is activated by default.
579+
///
580+
/// # Examples
581+
///
582+
/// ```
583+
/// #![feature(async_await, await_macro)]
584+
/// # futures::executor::block_on(async {
585+
/// use futures::channel::oneshot;
586+
/// use futures::stream::{self, StreamExt};
587+
///
588+
/// let (tx1, rx1) = oneshot::channel();
589+
/// let (tx2, rx2) = oneshot::channel();
590+
/// let (tx3, rx3) = oneshot::channel();
591+
///
592+
/// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent(
593+
/// /* limit */ 2,
594+
/// async move |rx| {
595+
/// await!(rx).unwrap();
596+
/// }
597+
/// );
598+
/// tx1.send(()).unwrap();
599+
/// tx2.send(()).unwrap();
600+
/// tx3.send(()).unwrap();
601+
/// await!(fut);
602+
/// # })
603+
/// ```
604+
#[cfg(feature = "std")]
605+
fn for_each_concurrent<Fut, F>(
606+
self,
607+
limit: impl Into<Option<usize>>,
608+
f: F,
609+
) -> ForEachConcurrent<Self, Fut, F>
610+
where F: FnMut(Self::Item) -> Fut,
611+
Fut: Future<Output = ()>,
612+
Self: Sized,
613+
{
614+
ForEachConcurrent::new(self, limit.into(), f)
615+
}
616+
555617
/// Creates a new stream of at most `n` items of the underlying stream.
556618
///
557619
/// Once `n` items have been yielded from this stream then it will always

futures-util/src/try_stream/mod.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ if_std! {
4646

4747
mod try_collect;
4848
pub use self::try_collect::TryCollect;
49+
50+
mod try_for_each_concurrent;
51+
pub use self::try_for_each_concurrent::TryForEachConcurrent;
52+
use futures_core::future::Future;
4953
}
5054

5155
impl<S: TryStream> TryStreamExt for S {}
@@ -255,6 +259,61 @@ pub trait TryStreamExt: TryStream {
255259
TrySkipWhile::new(self, f)
256260
}
257261

262+
/// Attempts to run this stream to completion, executing the provided asynchronous
263+
/// closure for each element on the stream concurrently as elements become
264+
/// available, exiting as soon as an error occurs.
265+
///
266+
/// This is similar to
267+
/// [`StreamExt::for_each_concurrent`](super::StreamExt::for_each_concurrent),
268+
/// but will resolve to an error immediately if the underlying stream or the provided
269+
/// closure return an error.
270+
///
271+
/// This method is only available when the `std` feature of this
272+
/// library is activated, and it is activated by default.
273+
///
274+
/// # Examples
275+
///
276+
/// ```
277+
/// #![feature(async_await, await_macro)]
278+
/// # futures::executor::block_on(async {
279+
/// use futures::channel::oneshot;
280+
/// use futures::stream::{self, StreamExt, TryStreamExt};
281+
///
282+
/// let (tx1, rx1) = oneshot::channel();
283+
/// let (tx2, rx2) = oneshot::channel();
284+
/// let (_tx3, rx3) = oneshot::channel();
285+
///
286+
/// let stream = stream::iter(vec![rx1, rx2, rx3]);
287+
/// let fut = stream.map(Ok).try_for_each_concurrent(
288+
/// /* limit */ 2,
289+
/// async move |rx| {
290+
/// let res: Result<(), oneshot::Canceled> = await!(rx);
291+
/// res
292+
/// }
293+
/// );
294+
///
295+
/// tx1.send(()).unwrap();
296+
/// // Drop the second sender so that `rx2` resolves to `Canceled`.
297+
/// drop(tx2);
298+
///
299+
/// // The final result is an error because the second future
300+
/// // resulted in an error.
301+
/// assert_eq!(Err(oneshot::Canceled), await!(fut));
302+
/// # })
303+
/// ```
304+
#[cfg(feature = "std")]
305+
fn try_for_each_concurrent<Fut, F>(
306+
self,
307+
limit: impl Into<Option<usize>>,
308+
f: F,
309+
) -> TryForEachConcurrent<Self, Fut, F>
310+
where F: FnMut(Self::Ok) -> Fut,
311+
Fut: Future<Output = Result<(), Self::Error>>,
312+
Self: Sized,
313+
{
314+
TryForEachConcurrent::new(self, limit.into(), f)
315+
}
316+
258317
/// Attempt to Collect all of the values of this stream into a vector,
259318
/// returning a future representing the result of that computation.
260319
///
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use crate::stream::{FuturesUnordered, StreamExt};
2+
use core::marker::Unpin;
3+
use core::mem::PinMut;
4+
use core::num::NonZeroUsize;
5+
use futures_core::future::Future;
6+
use futures_core::stream::TryStream;
7+
use futures_core::task::{self, Poll};
8+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
9+
10+
/// A stream combinator which executes a unit closure over each item on a
11+
/// stream concurrently.
12+
///
13+
/// This structure is returned by the
14+
/// [`TryStreamExt::try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent)
15+
/// method.
16+
#[derive(Debug)]
17+
#[must_use = "streams do nothing unless polled"]
18+
pub struct TryForEachConcurrent<St, Fut, F> {
19+
stream: Option<St>,
20+
f: F,
21+
futures: FuturesUnordered<Fut>,
22+
limit: Option<NonZeroUsize>,
23+
}
24+
25+
impl<St, Fut, F> Unpin for TryForEachConcurrent<St, Fut, F>
26+
where St: Unpin,
27+
Fut: Unpin,
28+
{}
29+
30+
impl<St, Fut, F> TryForEachConcurrent<St, Fut, F>
31+
where St: TryStream,
32+
F: FnMut(St::Ok) -> Fut,
33+
Fut: Future<Output = Result<(), St::Error>>,
34+
{
35+
unsafe_pinned!(stream: Option<St>);
36+
unsafe_unpinned!(f: F);
37+
unsafe_unpinned!(futures: FuturesUnordered<Fut>);
38+
unsafe_unpinned!(limit: Option<NonZeroUsize>);
39+
40+
pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> TryForEachConcurrent<St, Fut, F> {
41+
TryForEachConcurrent {
42+
stream: Some(stream),
43+
// Note: `limit` = 0 gets ignored.
44+
limit: limit.and_then(NonZeroUsize::new),
45+
f,
46+
futures: FuturesUnordered::new(),
47+
}
48+
}
49+
}
50+
51+
impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
52+
where St: TryStream,
53+
F: FnMut(St::Ok) -> Fut,
54+
Fut: Future<Output = Result<(), St::Error>>,
55+
{
56+
type Output = Result<(), St::Error>;
57+
58+
fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Self::Output> {
59+
loop {
60+
let mut made_progress_this_iter = false;
61+
62+
// Try and pull an item from the stream
63+
let current_len = self.futures().len();
64+
// Check if we've already created a number of futures greater than `limit`
65+
if self.limit().map(|limit| limit.get() > current_len).unwrap_or(true) {
66+
let mut stream_completed = false;
67+
let elem = if let Some(stream) = self.stream().as_pin_mut() {
68+
match stream.try_poll_next(cx)? {
69+
Poll::Ready(Some(elem)) => {
70+
made_progress_this_iter = true;
71+
Some(elem)
72+
},
73+
Poll::Ready(None) => {
74+
stream_completed = true;
75+
None
76+
}
77+
Poll::Pending => None,
78+
}
79+
} else {
80+
None
81+
};
82+
if stream_completed {
83+
PinMut::set(self.stream(), None);
84+
}
85+
if let Some(elem) = elem {
86+
let next_future = (self.f())(elem);
87+
self.futures().push(next_future);
88+
}
89+
}
90+
91+
match self.futures().poll_next_unpin(cx)? {
92+
Poll::Ready(Some(())) => made_progress_this_iter = true,
93+
Poll::Ready(None) => {
94+
if self.stream().is_none() {
95+
return Poll::Ready(Ok(()))
96+
}
97+
},
98+
Poll::Pending => {}
99+
}
100+
101+
if !made_progress_this_iter {
102+
return Poll::Pending;
103+
}
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)