Skip to content

Commit

Permalink
add FuturesUnordered::into_iter, make iter_pin_ref public (#2423)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev authored and taiki-e committed May 10, 2021
1 parent aa39d14 commit 48d65c3
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 3 deletions.
46 changes: 46 additions & 0 deletions futures-util/src/stream/futures_unordered/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,55 @@ pub struct IterPinRef<'a, Fut> {
/// Immutable iterator over all the futures in the unordered set.
pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>);

#[derive(Debug)]
/// Owned iterator over all futures in the unordered set.
pub struct IntoIter<Fut: Unpin> {
pub(super) len: usize,
pub(super) inner: FuturesUnordered<Fut>,
}

impl<Fut: Unpin> Iterator for IntoIter<Fut> {
type Item = Fut;

fn next(&mut self) -> Option<Fut> {
// `head_all` can be accessed directly and we don't need to spin on
// `Task::next_all` since we have exclusive access to the set.
let task = self.inner.head_all.get_mut();

if (*task).is_null() {
return None;
}

unsafe {
// Moving out of the future is safe because it is `Unpin`
let future = (*(**task).future.get()).take().unwrap();

// Mutable access to a previously shared `FuturesUnordered` implies
// that the other threads already released the object before the
// current thread acquired it, so relaxed ordering can be used and
// valid `next_all` checks can be skipped.
let next = (**task).next_all.load(Relaxed);
*task = next;
self.len -= 1;
Some(future)
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}

impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {}

impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
type Item = Pin<&'a mut Fut>;

fn next(&mut self) -> Option<Pin<&'a mut Fut>> {
if self.task.is_null() {
return None;
}

unsafe {
let future = (*(*self.task).future.get()).as_mut().unwrap();

Expand Down Expand Up @@ -78,6 +120,7 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
if self.task.is_null() {
return None;
}

unsafe {
let future = (*(*self.task).future.get()).as_ref().unwrap();

Expand Down Expand Up @@ -120,3 +163,6 @@ unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {}

unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {}
unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {}

unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {}
unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {}
39 changes: 36 additions & 3 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
mod abort;

mod iter;
pub use self::iter::{Iter, IterMut, IterPinMut, IterPinRef};
pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef};

mod task;
use self::task::Task;
Expand Down Expand Up @@ -194,10 +194,11 @@ impl<Fut> FuturesUnordered<Fut> {
}

/// Returns an iterator that allows inspecting each future in the set.
fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> {
pub fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> {
let (task, len) = self.atomic_load_head_and_len_all();
let pending_next_all = self.pending_next_all();

IterPinRef { task, len, pending_next_all: self.pending_next_all(), _marker: PhantomData }
IterPinRef { task, len, pending_next_all, _marker: PhantomData }
}

/// Returns an iterator that allows modifying each future in the set.
Expand Down Expand Up @@ -581,6 +582,38 @@ impl<Fut> Drop for FuturesUnordered<Fut> {
}
}

impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered<Fut> {
type Item = &'a Fut;
type IntoIter = Iter<'a, Fut>;

fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

impl<'a, Fut: Unpin> IntoIterator for &'a mut FuturesUnordered<Fut> {
type Item = &'a mut Fut;
type IntoIter = IterMut<'a, Fut>;

fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}

impl<Fut: Unpin> IntoIterator for FuturesUnordered<Fut> {
type Item = Fut;
type IntoIter = IntoIter<Fut>;

fn into_iter(mut self) -> Self::IntoIter {
// `head_all` can be accessed directly and we don't need to spin on
// `Task::next_all` since we have exclusive access to the set.
let task = *self.head_all.get_mut();
let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } };

IntoIter { len, inner: self }
}
}

impl<Fut> FromIterator<Fut> for FuturesUnordered<Fut> {
fn from_iter<I>(iter: I) -> Self
where
Expand Down
7 changes: 7 additions & 0 deletions futures/tests/auto_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1827,6 +1827,13 @@ pub mod stream {
assert_impl!(futures_unordered::IterPinRef<()>: Sync);
assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync);
assert_impl!(futures_unordered::IterPinRef<PhantomPinned>: Unpin);

assert_impl!(futures_unordered::IntoIter<()>: Send);
assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send);
assert_impl!(futures_unordered::IntoIter<()>: Sync);
assert_not_impl!(futures_unordered::IntoIter<*const ()>: Sync);
// The definition of futures_unordered::IntoIter has `Fut: Unpin` bounds.
// assert_not_impl!(futures_unordered::IntoIter<PhantomPinned>: Unpin);
}

/// Assert Send/Sync/Unpin for all public types in `futures::task`.
Expand Down
45 changes: 45 additions & 0 deletions futures/tests/stream_futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,51 @@ fn iter_len() {
assert!(iter.next().is_none());
}

#[test]
fn into_iter_cancel() {
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();

let stream = stream
.into_iter()
.map(|mut rx| {
rx.close();
rx
})
.collect::<FuturesUnordered<_>>();

let mut iter = block_on_stream(stream);

assert!(a_tx.is_canceled());
assert!(b_tx.is_canceled());
assert!(c_tx.is_canceled());

assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
assert_eq!(iter.next(), None);
}

#[test]
fn into_iter_len() {
let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
.into_iter()
.collect::<FuturesUnordered<_>>();

let mut into_iter = stream.into_iter();
assert_eq!(into_iter.len(), 3);
assert!(into_iter.next().is_some());
assert_eq!(into_iter.len(), 2);
assert!(into_iter.next().is_some());
assert_eq!(into_iter.len(), 1);
assert!(into_iter.next().is_some());
assert_eq!(into_iter.len(), 0);
assert!(into_iter.next().is_none());
}

#[test]
fn futures_not_moved_after_poll() {
// Future that will be ready after being polled twice,
Expand Down

0 comments on commit 48d65c3

Please sign in to comment.