Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement core::future::join! #91645

Merged
merged 5 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions library/core/src/future/join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#![allow(unused_imports)] // items are used by the macro

use crate::cell::UnsafeCell;
use crate::future::{poll_fn, Future};
use crate::mem;
use crate::pin::Pin;
use crate::task::{Context, Poll};

/// Polls multiple futures simultaneously, returning a tuple
/// of all results once complete.
///
/// While `join!(a, b)` is similar to `(a.await, b.await)`,
/// `join!` polls both futures concurrently and is therefore more efficient.
///
/// # Examples
///
/// ```
/// #![feature(future_join, future_poll_fn)]
///
/// use std::future::join;
///
/// async fn one() -> usize { 1 }
/// async fn two() -> usize { 2 }
///
/// # let _ = async {
/// let x = join!(one(), two()).await;
/// assert_eq!(x, (1, 2));
/// # };
/// ```
///
/// `join!` is variadic, so you can pass any number of futures:
///
/// ```
/// #![feature(future_join, future_poll_fn)]
///
/// use std::future::join;
///
/// async fn one() -> usize { 1 }
/// async fn two() -> usize { 2 }
/// async fn three() -> usize { 3 }
///
/// # let _ = async {
/// let x = join!(one(), two(), three()).await;
/// assert_eq!(x, (1, 2, 3));
/// # };
/// ```
#[unstable(feature = "future_join", issue = "91642")]
pub macro join {
( $($fut:expr),* $(,)?) => {
join! { @count: (), @futures: {}, @rest: ($($fut,)*) }
},
// Recurse until we have the position of each future in the tuple
(
// A token for each future that has been expanded: "_ _ _"
@count: ($($count:tt)*),
// Futures and their positions in the tuple: "{ a => (_), b => (_ _)) }"
@futures: { $($fut:tt)* },
// Take a future from @rest to expand
@rest: ($current:expr, $($rest:tt)*)
) => {
join! {
@count: ($($count)* _),
@futures: { $($fut)* $current => ($($count)*), },
@rest: ($($rest)*)
}
},
// Now generate the output future
(
@count: ($($count:tt)*),
@futures: {
$( $(@$f:tt)? $fut:expr => ( $($pos:tt)* ), )*
},
@rest: ()
) => {
async move {
let mut futures = ( $( MaybeDone::Future($fut), )* );

poll_fn(move |cx| {
let mut done = true;

$(
let ( $($pos,)* fut, .. ) = &mut futures;

// SAFETY: The futures are never moved
done &= unsafe { Pin::new_unchecked(fut).poll(cx).is_ready() };
)*

if done {
// Extract all the outputs
Poll::Ready(($({
let ( $($pos,)* fut, .. ) = &mut futures;

fut.take_output().unwrap()
}),*))
} else {
Poll::Pending
}
}).await
}
}
}

/// Future used by `join!` that stores it's output to
/// be later taken and doesn't panic when polled after ready.
///
/// This type is public in a private module for use by the macro.
#[allow(missing_debug_implementations)]
#[unstable(feature = "future_join", issue = "91642")]
pub enum MaybeDone<F: Future> {
Future(F),
Done(F::Output),
Took,
}

#[unstable(feature = "future_join", issue = "91642")]
impl<F: Future> MaybeDone<F> {
pub fn take_output(&mut self) -> Option<F::Output> {
match &*self {
MaybeDone::Done(_) => match mem::replace(self, Self::Took) {
MaybeDone::Done(val) => Some(val),
_ => unreachable!(),
},
_ => None,
}
}
}

#[unstable(feature = "future_join", issue = "91642")]
impl<F: Future> Future for MaybeDone<F> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// SAFETY: pinning in structural for `f`
unsafe {
match self.as_mut().get_unchecked_mut() {
MaybeDone::Future(f) => match Pin::new_unchecked(f).poll(cx) {
Poll::Ready(val) => self.set(Self::Done(val)),
Poll::Pending => return Poll::Pending,
},
MaybeDone::Done(_) => {}
MaybeDone::Took => unreachable!(),
}
}

Poll::Ready(())
}
}
4 changes: 4 additions & 0 deletions library/core/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ use crate::{

mod future;
mod into_future;
mod join;
mod pending;
mod poll_fn;
mod ready;

#[stable(feature = "futures_api", since = "1.36.0")]
pub use self::future::Future;

#[unstable(feature = "future_join", issue = "91642")]
pub use self::join::join;

#[unstable(feature = "into_future", issue = "67644")]
pub use into_future::IntoFuture;

Expand Down
85 changes: 85 additions & 0 deletions library/core/tests/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::future::{join, Future};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Wake};
use std::thread;

struct PollN {
val: usize,
polled: usize,
num: usize,
}

impl Future for PollN {
type Output = usize;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.polled += 1;

if self.polled == self.num {
return Poll::Ready(self.val);
}

cx.waker().wake_by_ref();
Poll::Pending
}
}

fn poll_n(val: usize, num: usize) -> PollN {
PollN { val, num, polled: 0 }
}

#[test]
fn test_join() {
block_on(async move {
let x = join!(async { 0 }).await;
assert_eq!(x, 0);

let x = join!(async { 0 }, async { 1 }).await;
assert_eq!(x, (0, 1));

let x = join!(async { 0 }, async { 1 }, async { 2 }).await;
assert_eq!(x, (0, 1, 2));

let x = join!(
poll_n(0, 1),
poll_n(1, 5),
poll_n(2, 2),
poll_n(3, 1),
poll_n(4, 2),
poll_n(5, 3),
poll_n(6, 4),
poll_n(7, 1)
)
.await;
assert_eq!(x, (0, 1, 2, 3, 4, 5, 6, 7));

let y = String::new();
let x = join!(async {
println!("{}", &y);
1
})
.await;
assert_eq!(x, 1);
});
}

fn block_on(fut: impl Future) {
struct Waker;
impl Wake for Waker {
fn wake(self: Arc<Self>) {
thread::current().unpark()
}
}

let waker = Arc::new(Waker).into();
let mut cx = Context::from_waker(&waker);
let mut fut = Box::pin(fut);

loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(_) => break,
Poll::Pending => thread::park(),
}
}
}
3 changes: 3 additions & 0 deletions library/core/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#![feature(flt2dec)]
#![feature(fmt_internals)]
#![feature(float_minimum_maximum)]
#![feature(future_join)]
#![feature(future_poll_fn)]
#![feature(array_from_fn)]
#![feature(hashmap_internals)]
#![feature(try_find)]
Expand Down Expand Up @@ -94,6 +96,7 @@ mod clone;
mod cmp;
mod const_ptr;
mod fmt;
mod future;
mod hash;
mod intrinsics;
mod iter;
Expand Down