Skip to content

Commit

Permalink
sync: add async APIs to oneshot and mpsc (tokio-rs#1211)
Browse files Browse the repository at this point in the history
Adds:

- oneshot::Sender::close
- mpsc::Receiver::recv
- mpsc::Sender::send

Also renames `poll_next` to `poll_recv`.

Refs: tokio-rs#1210
  • Loading branch information
carllerche authored Jun 27, 2019
1 parent 0af05e7 commit 32ceccb
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 41 deletions.
16 changes: 9 additions & 7 deletions tokio-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {

for arg in args {
match arg {
syn::NestedMeta::Meta(syn::Meta::Word(ident)) => match ident.to_string().to_lowercase().as_str() {
"multi_thread" => runtime = RuntimeType::Multi,
"single_thread" => runtime = RuntimeType::Single,
name => panic!("Unknown attribute {} is specified", name),
},
_ => ()
syn::NestedMeta::Meta(syn::Meta::Word(ident)) => {
match ident.to_string().to_lowercase().as_str() {
"multi_thread" => runtime = RuntimeType::Multi,
"single_thread" => runtime = RuntimeType::Single,
name => panic!("Unknown attribute {} is specified", name),
}
}
_ => (),
}
}

Expand All @@ -90,7 +92,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
rt.block_on(async { #body })
}
}
},
};

result.into()
Expand Down
6 changes: 3 additions & 3 deletions tokio-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ publish = false
async-traits = ["async-sink", "futures-core-preview"]

[dependencies]
fnv = "1.0.6"
async-util = { git = "https://github.com/tokio-rs/async" }
async-sink = { git = "https://github.com/tokio-rs/async", optional = true }
fnv = "1.0.6"
futures-core-preview = { version = "0.3.0-alpha.16", optional = true }

[dev-dependencies]
async-util = { git = "https://github.com/tokio-rs/async" }
env_logger = { version = "0.5", default-features = false }
pin-utils = "0.1.0-alpha.4"
# tokio = { version = "0.2.0", path = "../tokio" }
tokio = { version = "*", path = "../tokio" }
tokio-test = { version = "0.2.0", path = "../tokio-test" }
loom = { git = "https://github.com/carllerche/loom", branch = "std-future2", features = ["futures"] }
1 change: 1 addition & 0 deletions tokio-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
)]
#![cfg_attr(test, deny(warnings))]
#![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))]
#![feature(async_await)]

//! Asynchronous synchronization primitives.
//!
Expand Down
26 changes: 24 additions & 2 deletions tokio-sync/src/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,14 @@ impl<T> Receiver<T> {
}

/// TODO: Dox
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
pub async fn recv(&mut self) -> Option<T> {
use async_util::future::poll_fn;

poll_fn(|cx| self.poll_recv(cx)).await
}

/// TODO: Dox
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

Expand All @@ -150,7 +157,7 @@ impl<T> futures_core::Stream for Receiver<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
Receiver::poll_next(self.get_mut(), cx)
self.get_mut().poll_recv(cx)
}
}

Expand Down Expand Up @@ -189,6 +196,21 @@ impl<T> Sender<T> {
self.chan.try_send(message)?;
Ok(())
}

/// Send a value, waiting until there is capacity.
///
/// # Examples
///
/// ```
/// unimplemented!();
/// ```
pub async fn send(&mut self, value: T) -> Result<(), SendError> {
use async_util::future::poll_fn;

poll_fn(|cx| self.poll_ready(cx)).await?;

self.try_send(value).map_err(|_| SendError(()))
}
}

#[cfg(feature = "async-traits")]
Expand Down
9 changes: 8 additions & 1 deletion tokio-sync/src/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,17 @@ impl<T> UnboundedReceiver<T> {
}

/// TODO: dox
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// TODO: Dox
pub async fn recv(&mut self) -> Option<T> {
use async_util::future::poll_fn;

poll_fn(|cx| self.poll_recv(cx)).await
}

/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
Expand Down
19 changes: 19 additions & 0 deletions tokio-sync/src/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,25 @@ impl<T> Sender<T> {
Pending
}

/// Wait for the associated [`Receiver`] handle to drop.
///
/// # Return
///
/// Returns a `Future` which must be awaited on.
///
/// [`Receiver`]: struct.Receiver.html
///
/// # Examples
///
/// ```
/// unimplemented!();
/// ```
pub async fn closed(&mut self) {
use async_util::future::poll_fn;

poll_fn(|cx| self.poll_close(cx)).await
}

/// Check if the associated [`Receiver`] handle has been dropped.
///
/// Unlike [`poll_close`], this function does not register a task for
Expand Down
5 changes: 3 additions & 2 deletions tokio-sync/tests/fuzz_mpsc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![deny(warnings, rust_2018_idioms)]
#![feature(async_await)]

#[macro_use]
extern crate loom;
Expand Down Expand Up @@ -32,10 +33,10 @@ fn closing_tx() {
drop(tx);
});

let v = block_on(poll_fn(|cx| rx.poll_next(cx)));
let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
assert!(v.is_some());

let v = block_on(poll_fn(|cx| rx.poll_next(cx)));
let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
assert!(v.is_none());
});
}
1 change: 1 addition & 0 deletions tokio-sync/tests/fuzz_oneshot.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![deny(warnings, rust_2018_idioms)]
#![feature(async_await)]

/// Unwrap a ready value or propagate `Async::Pending`.
#[macro_export]
Expand Down
81 changes: 55 additions & 26 deletions tokio-sync/tests/mpsc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![deny(warnings, rust_2018_idioms)]
#![feature(async_await)]

use tokio_sync::mpsc;
use tokio_test::task::MockTask;
Expand Down Expand Up @@ -28,16 +29,30 @@ fn send_recv_with_buffer() {

drop(tx);

let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx)));
let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
assert_eq!(val, Some(1));

let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx)));
let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
assert_eq!(val, Some(2));

let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx)));
let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
assert!(val.is_none());
}

#[tokio::test]
async fn async_send_recv_with_buffer() {
let (mut tx, mut rx) = mpsc::channel(16);

tokio::spawn(async move {
assert_ok!(tx.send(1).await);
assert_ok!(tx.send(2).await);
});

assert_eq!(Some(1), rx.recv().await);
assert_eq!(Some(2), rx.recv().await);
assert_eq!(None, rx.recv().await);
}

#[test]
#[cfg(feature = "async-traits")]
fn send_sink_recv_with_buffer() {
Expand Down Expand Up @@ -65,13 +80,13 @@ fn send_sink_recv_with_buffer() {
t1.enter(|cx| {
pin_mut!(rx);

let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
let val = assert_ready!(rx.as_mut().poll_next(cx));
assert_eq!(val, Some(1));

let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
let val = assert_ready!(rx.as_mut().poll_next(cx));
assert_eq!(val, Some(2));

let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
let val = assert_ready!(rx.as_mut().poll_next(cx));
assert!(val.is_none());
});
}
Expand All @@ -97,15 +112,15 @@ fn start_send_past_cap() {

drop(tx1);

let val = t3.enter(|cx| assert_ready!(rx.poll_next(cx)));
let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx)));
assert!(val.is_some());

assert!(t2.is_woken());
assert!(!t1.is_woken());

drop(tx2);

let val = t3.enter(|cx| assert_ready!(rx.poll_next(cx)));
let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx)));
assert!(val.is_none());
}

Expand All @@ -125,18 +140,32 @@ fn send_recv_unbounded() {
assert_ok!(tx.try_send(1));
assert_ok!(tx.try_send(2));

let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert_eq!(val, Some(1));

let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert_eq!(val, Some(2));

drop(tx);

let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert!(val.is_none());
}

#[tokio::test]
async fn async_send_recv_unbounded() {
let (mut tx, mut rx) = mpsc::unbounded_channel();

tokio::spawn(async move {
assert_ok!(tx.try_send(1));
assert_ok!(tx.try_send(2));
});

assert_eq!(Some(1), rx.recv().await);
assert_eq!(Some(2), rx.recv().await);
assert_eq!(None, rx.recv().await);
}

#[test]
#[cfg(feature = "async-traits")]
fn sink_send_recv_unbounded() {
Expand Down Expand Up @@ -164,13 +193,13 @@ fn sink_send_recv_unbounded() {
t1.enter(|cx| {
pin_mut!(rx);

let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
let val = assert_ready!(rx.as_mut().poll_next(cx));
assert_eq!(val, Some(1));

let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
let val = assert_ready!(rx.as_mut().poll_next(cx));
assert_eq!(val, Some(2));

let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
let val = assert_ready!(rx.as_mut().poll_next(cx));
assert!(val.is_none());
});
}
Expand All @@ -189,7 +218,7 @@ fn no_t_bounds_buffer() {
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());

let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert!(val.is_some());
}

Expand All @@ -207,7 +236,7 @@ fn no_t_bounds_unbounded() {
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());

let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert!(val.is_some());
}

Expand All @@ -234,7 +263,7 @@ fn send_recv_buffer_limited() {

t2.enter(|cx| {
// Take the value
let val = assert_ready!(rx.poll_next(cx));
let val = assert_ready!(rx.poll_recv(cx));
assert_eq!(Some(1), val);
});

Expand All @@ -251,7 +280,7 @@ fn send_recv_buffer_limited() {

t2.enter(|cx| {
// Take the value
let val = assert_ready!(rx.poll_next(cx));
let val = assert_ready!(rx.poll_recv(cx));
assert_eq!(Some(2), val);
});

Expand All @@ -269,7 +298,7 @@ fn recv_close_gets_none_idle() {
rx.close();

t1.enter(|cx| {
let val = assert_ready!(rx.poll_next(cx));
let val = assert_ready!(rx.poll_recv(cx));
assert!(val.is_none());
assert_ready_err!(tx.poll_ready(cx));
});
Expand Down Expand Up @@ -298,7 +327,7 @@ fn recv_close_gets_none_reserved() {
assert_ready_err!(tx2.poll_ready(cx));
});

t3.enter(|cx| assert_pending!(rx.poll_next(cx)));
t3.enter(|cx| assert_pending!(rx.poll_recv(cx)));

assert!(!t1.is_woken());
assert!(!t2.is_woken());
Expand All @@ -308,10 +337,10 @@ fn recv_close_gets_none_reserved() {
assert!(t3.is_woken());

t3.enter(|cx| {
let v = assert_ready!(rx.poll_next(cx));
let v = assert_ready!(rx.poll_recv(cx));
assert_eq!(v, Some(123));

let v = assert_ready!(rx.poll_next(cx));
let v = assert_ready!(rx.poll_recv(cx));
assert!(v.is_none());
});
}
Expand All @@ -324,7 +353,7 @@ fn tx_close_gets_none() {

// Run on a task context
t1.enter(|cx| {
let v = assert_ready!(rx.poll_next(cx));
let v = assert_ready!(rx.poll_recv(cx));
assert!(v.is_none());
});
}
Expand All @@ -341,16 +370,16 @@ fn try_send_fail() {
let err = assert_err!(tx.try_send("fail"));
assert!(err.is_full());

let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert_eq!(val, Some("hello"));

assert_ok!(tx.try_send("goodbye"));
drop(tx);

let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert_eq!(val, Some("goodbye"));

let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert!(val.is_none());
}

Expand Down
Loading

0 comments on commit 32ceccb

Please sign in to comment.