diff --git a/src/sync.rs b/src/sync.rs index 6aa42164a8b..ff7438a25a0 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -9,5 +9,7 @@ //! from one task to another. //! - [mpsc](mpsc/index.html), a multi-producer, single-consumer channel for //! sending values between tasks. +//! - [watch](watch/index.html), a single-producer, multi-consumer channel that +//! only stores the **most recently** sent value. -pub use tokio_sync::{mpsc, oneshot}; +pub use tokio_sync::{mpsc, oneshot, watch}; diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml index 0cc800b954f..1b16d87f9ce 100644 --- a/tokio-sync/Cargo.toml +++ b/tokio-sync/Cargo.toml @@ -19,6 +19,7 @@ Synchronization utilities. categories = ["asynchronous"] [dependencies] +fnv = "1.0.6" futures = "0.1.19" [dev-dependencies] diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs index 4cbbbc0adfd..e7c62ece2af 100644 --- a/tokio-sync/src/lib.rs +++ b/tokio-sync/src/lib.rs @@ -6,6 +6,8 @@ //! //! This crate provides primitives for synchronizing asynchronous tasks. +extern crate fnv; +#[macro_use] extern crate futures; macro_rules! debug { @@ -27,3 +29,4 @@ pub mod mpsc; pub mod oneshot; pub mod semaphore; pub mod task; +pub mod watch; diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs new file mode 100644 index 00000000000..4c2b2972f0d --- /dev/null +++ b/tokio-sync/src/watch.rs @@ -0,0 +1,404 @@ +//! A single-producer, multi-consumer channel that only retains the *last* sent +//! value. +//! +//! This channel is useful for watching for changes to a value from multiple +//! points in the code base, for example, changes to configuration values. +//! +//! # Usage +//! +//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer +//! and sender halves of the channel. The channel is created with an initial +//! value. `Receiver::poll` will always be ready upon creation and will yield +//! either this initial value or the latest value that has been sent by +//! `Sender`. +//! +//! Calls to [`Receiver::poll`] and [`Receiver::poll_ref`] will always yield +//! the latest value. +//! +//! # Examples +//! +//! ``` +//! # extern crate futures; +//! extern crate tokio; +//! +//! use tokio::prelude::*; +//! use tokio::sync::watch; +//! +//! # tokio::run(futures::future::lazy(|| { +//! let (mut tx, rx) = watch::channel("hello"); +//! +//! tokio::spawn(rx.for_each(|value| { +//! println!("received = {:?}", value); +//! Ok(()) +//! }).map_err(|_| ())); +//! +//! tx.broadcast("world").unwrap(); +//! # Ok(()) +//! # })); +//! ``` +//! +//! # Closing +//! +//! [`Sender::poll_close`] allows the producer to detect when all [`Sender`] +//! handles have been dropped. This indicates that there is no further interest +//! in the values being produced and work can be stopped. +//! +//! # Thread safety +//! +//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other +//! threads and can be used in a concurrent environment. Clones of [`Receiver`] +//! handles may be moved to separate threads and also used concurrently. +//! +//! [`Sender`]: struct.Sender.html +//! [`Receiver`]: struct.Receiver.html +//! [`channel`]: fn.channel.html +//! [`Sender::poll_close`]: struct.Sender.html#method.poll_close +//! [`Receiver::poll`]: struct.Receiver.html#method.poll +//! [`Receiver::poll_ref`]: struct.Receiver.html#method.poll_ref + +use fnv::FnvHashMap; +use futures::task::AtomicTask; +use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; + +use std::ops; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; + +/// Receives values from the associated `Sender`. +/// +/// Instances are created by the [`channel`](fn.channel.html) function. +#[derive(Debug)] +pub struct Receiver { + /// Pointer to the shared state + shared: Arc>, + + /// Pointer to the watcher's internal state + inner: Arc, + + /// Watcher ID. + id: u64, + + /// Last observed version + ver: usize, +} + +/// Sends values to the associated `Receiver`. +/// +/// Instances are created by the [`channel`](fn.channel.html) function. +#[derive(Debug)] +pub struct Sender { + shared: Weak>, +} + +/// Returns a reference to the inner value +/// +/// Outstanding borrows hold a read lock on the inner value. This means that +/// long lived borrows could cause the produce half to block. It is recommended +/// to keep the borrow as short lived as possible. +#[derive(Debug)] +pub struct Ref<'a, T: 'a> { + inner: RwLockReadGuard<'a, T>, +} + +pub mod error { + //! Watch error types + + /// Error produced when receiving a value fails. + #[derive(Debug)] + pub struct RecvError { + pub(crate) _p: (), + } + + /// Error produced when sending a value fails. + #[derive(Debug)] + pub struct SendError { + pub(crate) inner: T, + } +} + +#[derive(Debug)] +struct Shared { + /// The most recent value + value: RwLock, + + /// The current version + /// + /// The lowest bit represents a "closed" state. The rest of the bits + /// represent the current version. + version: AtomicUsize, + + /// All watchers + watchers: Mutex, + + /// Task to notify when all watchers drop + cancel: AtomicTask, +} + +#[derive(Debug)] +struct Watchers { + next_id: u64, + watchers: FnvHashMap>, +} + +#[derive(Debug)] +struct WatchInner { + task: AtomicTask, +} + +const CLOSED: usize = 1; + +/// Create a new watch channel, returning the "send" and "receive" handles. +/// +/// All values sent by `Sender` will become visible to the `Receiver` handles. +/// Only the last value sent is made available to the `Receiver` half. All +/// intermediate values are dropped. +/// +/// # Examples +/// +/// ``` +/// # extern crate futures; +/// extern crate tokio; +/// +/// use tokio::prelude::*; +/// use tokio::sync::watch; +/// +/// # tokio::run(futures::future::lazy(|| { +/// let (mut tx, rx) = watch::channel("hello"); +/// +/// tokio::spawn(rx.for_each(|value| { +/// println!("received = {:?}", value); +/// Ok(()) +/// }).map_err(|_| ())); +/// +/// tx.broadcast("world").unwrap(); +/// # Ok(()) +/// # })); +/// ``` +pub fn channel(init: T) -> (Sender, Receiver) { + const INIT_ID: u64 = 0; + + let inner = Arc::new(WatchInner::new()); + + // Insert the watcher + let mut watchers = FnvHashMap::with_capacity_and_hasher(0, Default::default()); + watchers.insert(INIT_ID, inner.clone()); + + let shared = Arc::new(Shared { + value: RwLock::new(init), + version: AtomicUsize::new(2), + watchers: Mutex::new(Watchers { + next_id: INIT_ID + 1, + watchers, + }), + cancel: AtomicTask::new(), + }); + + let tx = Sender { + shared: Arc::downgrade(&shared), + }; + + let rx = Receiver { + shared, + inner, + id: INIT_ID, + ver: 0, + }; + + (tx, rx) +} + +impl Receiver { + /// Returns a reference to the most recently sent value + /// + /// Outstanding borrows hold a read lock. This means that long lived borrows + /// could cause the send half to block. It is recommended to keep the borrow + /// as short lived as possible. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::sync::watch; + /// let (_, rx) = watch::channel("hello"); + /// assert_eq!(*rx.get_ref(), "hello"); + /// ``` + pub fn get_ref(&self) -> Ref { + let inner = self.shared.value.read().unwrap(); + Ref { inner } + } + + /// Attempts to receive the latest value sent via the channel. + /// + /// If a new, unobserved, value has been sent, a reference to it is + /// returned. If no new value has been sent, then `NotReady` is returned and + /// the current task is notified once a new value is sent. + /// + /// Only the **most recent** value is returned. If the receiver is falling + /// behind the sender, intermediate values are dropped. + pub fn poll_ref(&mut self) -> Poll>, error::RecvError> { + // Make sure the task is up to date + self.inner.task.register(); + + let state = self.shared.version.load(SeqCst); + let version = state & !CLOSED; + + if version != self.ver { + // Track the latest version + self.ver = version; + + let inner = self.shared.value.read().unwrap(); + + return Ok(Some(Ref { inner }).into()); + } + + if CLOSED == state & CLOSED { + // The `Store` handle has been dropped. + return Ok(None.into()); + } + + Ok(Async::NotReady) + } +} + +impl Stream for Receiver { + type Item = T; + type Error = error::RecvError; + + fn poll(&mut self) -> Poll, error::RecvError> { + let item = try_ready!(self.poll_ref()); + Ok(Async::Ready(item.map(|v_ref| v_ref.clone()))) + } +} + +impl Clone for Receiver { + fn clone(&self) -> Self { + let inner = Arc::new(WatchInner::new()); + let shared = self.shared.clone(); + + let id = { + let mut watchers = shared.watchers.lock().unwrap(); + let id = watchers.next_id; + + watchers.next_id += 1; + watchers.watchers.insert(id, inner.clone()); + + id + }; + + let ver = self.ver; + + Receiver { + shared: shared, + inner, + id, + ver, + } + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + let mut watchers = self.shared.watchers.lock().unwrap(); + watchers.watchers.remove(&self.id); + } +} + +impl WatchInner { + fn new() -> Self { + WatchInner { + task: AtomicTask::new(), + } + } +} + +impl Sender { + /// Broadcast a new value via the channel, notifying all receivers. + pub fn broadcast(&mut self, value: T) -> Result<(), error::SendError> { + let shared = match self.shared.upgrade() { + Some(shared) => shared, + // All `Watch` handles have been canceled + None => return Err(error::SendError { inner: value }), + }; + + // Replace the value + { + let mut lock = shared.value.write().unwrap(); + *lock = value; + } + + // Update the version. 2 is used so that the CLOSED bit is not set. + shared.version.fetch_add(2, SeqCst); + + // Notify all watchers + notify_all(&*shared); + + // Return the old value + Ok(()) + } + + /// Returns `Ready` when all receivers have dropped. + /// + /// This allows the producer to get notified when interest in the produced + /// values is canceled and immediately stop doing work. + pub fn poll_close(&mut self) -> Poll<(), ()> { + match self.shared.upgrade() { + Some(shared) => { + shared.cancel.register(); + Ok(Async::NotReady) + } + None => Ok(Async::Ready(())), + } + } +} + +impl Sink for Sender { + type SinkItem = T; + type SinkError = error::SendError; + + fn start_send(&mut self, item: T) -> StartSend> { + let _ = self.broadcast(item)?; + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), error::SendError> { + Ok(().into()) + } +} + +/// Notify all watchers of a change +fn notify_all(shared: &Shared) { + let watchers = shared.watchers.lock().unwrap(); + + for watcher in watchers.watchers.values() { + // Notify the task + watcher.task.notify(); + } +} + +impl Drop for Sender { + fn drop(&mut self) { + if let Some(shared) = self.shared.upgrade() { + shared.version.fetch_or(CLOSED, SeqCst); + notify_all(&*shared); + } + } +} + +// ===== impl Ref ===== + +impl<'a, T: 'a> ops::Deref for Ref<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + self.inner.deref() + } +} + +// ===== impl Shared ===== + +impl Drop for Shared { + fn drop(&mut self) { + self.cancel.notify(); + } +} diff --git a/tokio-sync/tests/watch.rs b/tokio-sync/tests/watch.rs new file mode 100644 index 00000000000..7e4e35b56b4 --- /dev/null +++ b/tokio-sync/tests/watch.rs @@ -0,0 +1,234 @@ +extern crate futures; +extern crate tokio_mock_task; +extern crate tokio_sync; + +use tokio_mock_task::*; +use tokio_sync::watch; + +macro_rules! assert_ready { + ($e:expr) => {{ + match $e { + Ok(futures::Async::Ready(v)) => v, + Ok(_) => panic!("not ready"), + Err(e) => panic!("error = {:?}", e), + } + }}; +} + +macro_rules! assert_not_ready { + ($e:expr) => {{ + match $e { + Ok(futures::Async::NotReady) => {} + Ok(futures::Async::Ready(v)) => panic!("ready; value = {:?}", v), + Err(e) => panic!("error = {:?}", e), + } + }}; +} + +#[test] +fn single_rx() { + let (mut tx, mut rx) = watch::channel("one"); + let mut task = MockTask::new(); + + task.enter(|| { + let v = assert_ready!(rx.poll_ref()).unwrap(); + assert_eq!(*v, "one"); + }); + + task.enter(|| assert_not_ready!(rx.poll_ref())); + + assert!(!task.is_notified()); + + tx.broadcast("two").unwrap(); + + assert!(task.is_notified()); + + task.enter(|| { + let v = assert_ready!(rx.poll_ref()).unwrap(); + assert_eq!(*v, "two"); + }); + + task.enter(|| assert_not_ready!(rx.poll_ref())); + + drop(tx); + + assert!(task.is_notified()); + + task.enter(|| { + let res = assert_ready!(rx.poll_ref()); + assert!(res.is_none()); + }); +} + +#[test] +fn stream_impl() { + use futures::Stream; + + let (mut tx, mut rx) = watch::channel("one"); + let mut task = MockTask::new(); + + task.enter(|| { + let v = assert_ready!(rx.poll()).unwrap(); + assert_eq!(v, "one"); + }); + + task.enter(|| assert_not_ready!(rx.poll())); + + assert!(!task.is_notified()); + + tx.broadcast("two").unwrap(); + + assert!(task.is_notified()); + + task.enter(|| { + let v = assert_ready!(rx.poll()).unwrap(); + assert_eq!(v, "two"); + }); + + task.enter(|| assert_not_ready!(rx.poll())); + + drop(tx); + + assert!(task.is_notified()); + + task.enter(|| { + let res = assert_ready!(rx.poll()); + assert!(res.is_none()); + }); +} + +#[test] +fn multi_rx() { + let (mut tx, mut rx1) = watch::channel("one"); + let mut rx2 = rx1.clone(); + + let mut task1 = MockTask::new(); + let mut task2 = MockTask::new(); + + task1.enter(|| { + let res = assert_ready!(rx1.poll_ref()); + assert_eq!(*res.unwrap(), "one"); + }); + + task2.enter(|| { + let res = assert_ready!(rx2.poll_ref()); + assert_eq!(*res.unwrap(), "one"); + }); + + tx.broadcast("two").unwrap(); + + assert!(task1.is_notified()); + assert!(task2.is_notified()); + + task1.enter(|| { + let res = assert_ready!(rx1.poll_ref()); + assert_eq!(*res.unwrap(), "two"); + }); + + tx.broadcast("three").unwrap(); + + assert!(task1.is_notified()); + assert!(task2.is_notified()); + + task1.enter(|| { + let res = assert_ready!(rx1.poll_ref()); + assert_eq!(*res.unwrap(), "three"); + }); + + task2.enter(|| { + let res = assert_ready!(rx2.poll_ref()); + assert_eq!(*res.unwrap(), "three"); + }); + + tx.broadcast("four").unwrap(); + + task1.enter(|| { + let res = assert_ready!(rx1.poll_ref()); + assert_eq!(*res.unwrap(), "four"); + }); + + drop(tx); + + task1.enter(|| { + let res = assert_ready!(rx1.poll_ref()); + assert!(res.is_none()); + }); + + task2.enter(|| { + let res = assert_ready!(rx2.poll_ref()); + assert_eq!(*res.unwrap(), "four"); + }); + + task2.enter(|| { + let res = assert_ready!(rx2.poll_ref()); + assert!(res.is_none()); + }); +} + +#[test] +fn rx_observes_final_value() { + // Initial value + + let (tx, mut rx) = watch::channel("one"); + let mut task = MockTask::new(); + + drop(tx); + + task.enter(|| { + let res = assert_ready!(rx.poll_ref()); + assert!(res.is_some()); + assert_eq!(*res.unwrap(), "one"); + }); + + task.enter(|| { + let res = assert_ready!(rx.poll_ref()); + assert!(res.is_none()); + }); + + // Sending a value + + let (mut tx, mut rx) = watch::channel("one"); + let mut task = MockTask::new(); + + tx.broadcast("two").unwrap(); + + task.enter(|| { + let res = assert_ready!(rx.poll_ref()); + assert!(res.is_some()); + assert_eq!(*res.unwrap(), "two"); + }); + + task.enter(|| assert_not_ready!(rx.poll_ref())); + + tx.broadcast("three").unwrap(); + drop(tx); + + assert!(task.is_notified()); + + task.enter(|| { + let res = assert_ready!(rx.poll_ref()); + assert!(res.is_some()); + assert_eq!(*res.unwrap(), "three"); + }); + + task.enter(|| { + let res = assert_ready!(rx.poll_ref()); + assert!(res.is_none()); + }); +} + +#[test] +fn poll_close() { + let (mut tx, rx) = watch::channel("one"); + let mut task = MockTask::new(); + + task.enter(|| assert_not_ready!(tx.poll_close())); + + drop(rx); + + assert!(task.is_notified()); + + task.enter(|| assert_ready!(tx.poll_close())); + + assert!(tx.broadcast("two").is_err()); +}