Skip to content

Commit 037119c

Browse files
authored
Merge pull request #478 from portgasd666/master
Add Future::join and Future::try_join
2 parents 548733e + f04b6f6 commit 037119c

File tree

4 files changed

+230
-8
lines changed

4 files changed

+230
-8
lines changed

src/future/future/join.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use std::pin::Pin;
2+
3+
use async_macros::MaybeDone;
4+
use pin_project_lite::pin_project;
5+
6+
use crate::task::{Context, Poll};
7+
use std::future::Future;
8+
9+
pin_project! {
10+
#[allow(missing_docs)]
11+
#[allow(missing_debug_implementations)]
12+
pub struct Join<L, R>
13+
where
14+
L: Future,
15+
R: Future<Output = L::Output>
16+
{
17+
#[pin] left: MaybeDone<L>,
18+
#[pin] right: MaybeDone<R>,
19+
}
20+
}
21+
22+
impl<L, R> Join<L, R>
23+
where
24+
L: Future,
25+
R: Future<Output = L::Output>,
26+
{
27+
pub(crate) fn new(left: L, right: R) -> Self {
28+
Self {
29+
left: MaybeDone::new(left),
30+
right: MaybeDone::new(right),
31+
}
32+
}
33+
}
34+
35+
impl<L, R> Future for Join<L, R>
36+
where
37+
L: Future,
38+
R: Future<Output = L::Output>,
39+
{
40+
type Output = (L::Output, R::Output);
41+
42+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
43+
let this = self.project();
44+
45+
let mut left = this.left;
46+
let mut right = this.right;
47+
48+
if Future::poll(Pin::new(&mut left), cx).is_ready() {
49+
if right.as_ref().output().is_some() {
50+
return Poll::Ready((left.take().unwrap(), right.take().unwrap()));
51+
}
52+
}
53+
54+
if Future::poll(Pin::new(&mut right), cx).is_ready() {
55+
if left.as_ref().output().is_some() {
56+
return Poll::Ready((left.take().unwrap(), right.take().unwrap()));
57+
}
58+
}
59+
60+
Poll::Pending
61+
}
62+
}

src/future/future/mod.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ cfg_unstable! {
33
mod flatten;
44
mod race;
55
mod try_race;
6+
mod join;
7+
mod try_join;
68

79
use std::time::Duration;
810

@@ -11,6 +13,8 @@ cfg_unstable! {
1113
use crate::future::IntoFuture;
1214
use race::Race;
1315
use try_race::TryRace;
16+
use join::Join;
17+
use try_join::TryJoin;
1418
}
1519

1620
extension_trait! {
@@ -264,6 +268,90 @@ extension_trait! {
264268
{
265269
TryRace::new(self, other)
266270
}
271+
272+
#[doc = r#"
273+
Waits for two similarly-typed futures to complete.
274+
275+
Awaits multiple futures simultaneously, returning the output of the
276+
futures once both complete.
277+
278+
This function returns a new future which polls both futures
279+
concurrently.
280+
281+
# Examples
282+
283+
```
284+
# async_std::task::block_on(async {
285+
use async_std::prelude::*;
286+
use async_std::future;
287+
288+
let a = future::ready(1u8);
289+
let b = future::ready(2u8);
290+
291+
let f = a.join(b);
292+
assert_eq!(f.await, (1u8, 2u8));
293+
# });
294+
```
295+
"#]
296+
#[cfg(any(feature = "unstable", feature = "docs"))]
297+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
298+
fn join<F>(
299+
self,
300+
other: F
301+
) -> impl Future<Output = (<Self as std::future::Future>::Output, <F as std::future::Future>::Output)> [Join<Self, F>]
302+
where
303+
Self: std::future::Future + Sized,
304+
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
305+
{
306+
Join::new(self, other)
307+
}
308+
309+
#[doc = r#"
310+
Waits for two similarly-typed fallible futures to complete.
311+
312+
Awaits multiple futures simultaneously, returning all results once
313+
complete.
314+
315+
`try_join` is similar to [`join`], but returns an error immediately
316+
if a future resolves to an error.
317+
318+
[`join`]: #method.join
319+
320+
# Examples
321+
322+
```
323+
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
324+
#
325+
use async_std::prelude::*;
326+
use async_std::future;
327+
328+
let a = future::ready(Err("Error"));
329+
let b = future::ready(Ok(1u8));
330+
331+
let f = a.try_join(b);
332+
assert_eq!(f.await, Err("Error"));
333+
334+
let a = future::ready(Ok::<u8, String>(1u8));
335+
let b = future::ready(Ok::<u8, String>(2u8));
336+
337+
let f = a.try_join(b);
338+
assert_eq!(f.await, Ok((1u8, 2u8)));
339+
#
340+
# Ok(()) }) }
341+
```
342+
"#]
343+
#[cfg(any(feature = "unstable", feature = "docs"))]
344+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
345+
fn try_join<F, T, E>(
346+
self,
347+
other: F
348+
) -> impl Future<Output = Result<(T, T), E>> [TryJoin<Self, F>]
349+
where
350+
Self: std::future::Future<Output = Result<T, E>> + Sized,
351+
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
352+
{
353+
TryJoin::new(self, other)
354+
}
267355
}
268356

269357
impl<F: Future + Unpin + ?Sized> Future for Box<F> {

src/future/future/try_join.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use std::pin::Pin;
2+
3+
use async_macros::MaybeDone;
4+
use pin_project_lite::pin_project;
5+
6+
use crate::task::{Context, Poll};
7+
use std::future::Future;
8+
9+
pin_project! {
10+
#[allow(missing_docs)]
11+
#[allow(missing_debug_implementations)]
12+
pub struct TryJoin<L, R>
13+
where
14+
L: Future,
15+
R: Future<Output = L::Output>
16+
{
17+
#[pin] left: MaybeDone<L>,
18+
#[pin] right: MaybeDone<R>,
19+
}
20+
}
21+
22+
impl<L, R> TryJoin<L, R>
23+
where
24+
L: Future,
25+
R: Future<Output = L::Output>,
26+
{
27+
pub(crate) fn new(left: L, right: R) -> Self {
28+
Self {
29+
left: MaybeDone::new(left),
30+
right: MaybeDone::new(right),
31+
}
32+
}
33+
}
34+
35+
impl<L, R, T, E> Future for TryJoin<L, R>
36+
where
37+
L: Future<Output = Result<T, E>>,
38+
R: Future<Output = L::Output>,
39+
{
40+
type Output = Result<(T, T), E>;
41+
42+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
43+
let this = self.project();
44+
45+
let mut left = this.left;
46+
let mut right = this.right;
47+
48+
if Future::poll(Pin::new(&mut left), cx).is_ready() {
49+
if left.as_ref().output().unwrap().is_err() {
50+
return Poll::Ready(Err(left.take().unwrap().err().unwrap()));
51+
} else if right.as_ref().output().is_some() {
52+
return Poll::Ready(Ok((
53+
left.take().unwrap().ok().unwrap(),
54+
right.take().unwrap().ok().unwrap(),
55+
)));
56+
}
57+
}
58+
59+
if Future::poll(Pin::new(&mut right), cx).is_ready() {
60+
if right.as_ref().output().unwrap().is_err() {
61+
return Poll::Ready(Err(right.take().unwrap().err().unwrap()));
62+
} else if left.as_ref().output().is_some() {
63+
return Poll::Ready(Ok((
64+
left.take().unwrap().ok().unwrap(),
65+
right.take().unwrap().ok().unwrap(),
66+
)));
67+
}
68+
}
69+
70+
Poll::Pending
71+
}
72+
}

src/future/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88
//! operations converts multiple future into a single future that returns the
99
//! first output.
1010
//!
11-
//! For operating on futures the following macros can be used:
11+
//! For operating on futures the following functions can be used:
1212
//!
1313
//! | Name | Return signature | When does it return? |
1414
//! | --- | --- | --- |
15-
//! | [`future::join!`] | `(T1, T2)` | Wait for all to complete
15+
//! | [`Future::join`] | `(T1, T2)` | Wait for all to complete
1616
//! | [`Future::race`] | `T` | Return on first value
1717
//!
1818
//! ## Fallible Futures Concurrency
1919
//!
2020
//! For operating on futures that return `Result` additional `try_` variants of
21-
//! the macros mentioned before can be used. These macros are aware of `Result`,
21+
//! the functions mentioned before can be used. These functions are aware of `Result`,
2222
//! and will behave slightly differently from their base variants.
2323
//!
2424
//! In the case of `try_join`, if any of the futures returns `Err` all
@@ -30,19 +30,19 @@
3030
//! means `try_race` will keep going until any one of the futures returns
3131
//! `Ok`, or _all_ futures have returned `Err`.
3232
//!
33-
//! However sometimes it can be useful to use the base variants of the macros
33+
//! However sometimes it can be useful to use the base variants of the functions
3434
//! even on futures that return `Result`. Here is an overview of operations that
3535
//! work on `Result`, and their respective semantics:
3636
//!
3737
//! | Name | Return signature | When does it return? |
3838
//! | --- | --- | --- |
39-
//! | [`future::join!`] | `(Result<T, E>, Result<T, E>)` | Wait for all to complete
40-
//! | [`future::try_join!`] | `Result<(T1, T2), E>` | Return on first `Err`, wait for all to complete
39+
//! | [`Future::join`] | `(Result<T, E>, Result<T, E>)` | Wait for all to complete
40+
//! | [`Future::try_join`] | `Result<(T1, T2), E>` | Return on first `Err`, wait for all to complete
4141
//! | [`Future::race`] | `Result<T, E>` | Return on first value
4242
//! | [`Future::try_race`] | `Result<T, E>` | Return on first `Ok`, reject on last Err
4343
//!
44-
//! [`future::join!`]: macro.join.html
45-
//! [`future::try_join!`]: macro.try_join.html
44+
//! [`Future::join`]: trait.Future.html#method.join
45+
//! [`Future::try_join`]: trait.Future.html#method.try_join
4646
//! [`Future::race`]: trait.Future.html#method.race
4747
//! [`Future::try_race`]: trait.Future.html#method.try_race
4848

0 commit comments

Comments
 (0)