Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 85 additions & 13 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
//!
//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or
//! `Err(`[`error::RecvError`]`)` if all [`Sender`]s have been dropped.
//! * [`Receiver::changed()`] returns:
//! * `Ok(())` on receiving a new value.
//! * `Err(`[`RecvError`](error::RecvError)`)` if the
//! channel has been closed __AND__ the current value is *seen*.
//! * If the current value is *unseen* when calling [`changed`], then
//! [`changed`] will return immediately. If the current value is *seen*, then
//! it will sleep until either a new message is sent via the [`Sender`] half,
Expand All @@ -42,7 +44,28 @@
//! The current value at the time the [`Receiver`] is created is considered
//! *seen*.
//!
//! ## `borrow_and_update` versus `borrow`
//! ## [`changed`] versus [`has_changed`]
//!
//! The [`Receiver`] half provides two methods for checking for changes
//! in the channel, [`has_changed`] and [`changed`].
//!
//! * [`has_changed`] is a *synchronous* method that checks whether the current
//! value is seen or not and returns a boolean. This method does __not__ mark the
//! value as seen.
//!
//! * [`changed`] is an *asynchronous* method that will return once an unseen
//! value is in the channel. This method does mark the value as seen.
//!
//! Note there are two behavioral differences on when these two methods return
//! an error.
//!
//! - [`has_changed`] errors if and only if the channel is closed.
//! - [`changed`] errors if the channel has been closed __AND__
//! the current value is seen.
//!
//! See the example below that shows how these methods have different fallibility.
//!
//! ## [`borrow_and_update`] versus [`borrow`]
//!
//! If the receiver intends to await notifications from [`changed`] in a loop,
//! [`Receiver::borrow_and_update()`] should be preferred over
Expand Down Expand Up @@ -84,6 +107,31 @@
//! # }
//! ```
//!
//! Difference on fallibility of [`changed`] versus [`has_changed`].
//! ```
//! use tokio::sync::watch;
//!
//! #[tokio::main]
//! async fn main() {
//! let (tx, mut rx) = watch::channel("hello");
//! tx.send("goodbye").unwrap();
//! drop(tx);
//!
//! // `has_changed` does not mark the value as seen and errors
//! // since the channel is closed.
//! assert!(rx.has_changed().is_err());
//!
//! // `changed` returns Ok since the value is not already marked as seen
//! // even if the channel is closed.
//! assert!(rx.changed().await.is_ok());
//!
//! // The `changed` call above marks the value as seen.
//! // The next `changed` call now returns an error as the channel is closed
//! // AND the current value is seen.
//! assert!(rx.changed().await.is_err());
//! }
//! ```
//!
//! # Closing
//!
//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
Expand All @@ -102,6 +150,9 @@
//! [`Sender`]: crate::sync::watch::Sender
//! [`Receiver`]: crate::sync::watch::Receiver
//! [`changed`]: crate::sync::watch::Receiver::changed
//! [`has_changed`]: crate::sync::watch::Receiver::has_changed
//! [`borrow`]: crate::sync::watch::Receiver::borrow
//! [`borrow_and_update`]: crate::sync::watch::Receiver::borrow_and_update
//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
//! [`Receiver::borrow_and_update()`]:
Expand Down Expand Up @@ -637,15 +688,20 @@ impl<T> Receiver<T> {
}

/// Checks if this channel contains a message that this receiver has not yet
/// seen. The new value is not marked as seen.
/// seen. The current value will not be marked as seen.
///
/// Although this method is called `has_changed`, it does not check
/// messages for equality, so this call will return true even if the current
/// message is equal to the previous message.
///
/// # Errors
///
/// Although this method is called `has_changed`, it does not check new
/// messages for equality, so this call will return true even if the new
/// message is equal to the old message.
/// Returns a [`RecvError`](error::RecvError) if and only if the channel has been closed.
///
/// Returns an error if the channel has been closed.
/// # Examples
///
/// ## Basic usage
///
/// ```
/// use tokio::sync::watch;
///
Expand All @@ -660,9 +716,22 @@ impl<T> Receiver<T> {
///
/// // The value has been marked as seen
/// assert!(!rx.has_changed().unwrap());
/// }
/// ```
///
/// ## Closed channel example
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = watch::channel("hello");
/// tx.send("goodbye").unwrap();
///
/// drop(tx);
/// // The `tx` handle has been dropped
///
/// // The channel is closed
/// assert!(rx.has_changed().is_err());
/// }
/// ```
Expand Down Expand Up @@ -701,19 +770,22 @@ impl<T> Receiver<T> {
self.version = current_version;
}

/// Waits for a change notification, then marks the newest value as seen.
/// Waits for a change notification, then marks the current value as seen.
///
/// If the newest value in the channel has not yet been marked seen when
/// If the current value in the channel has not yet been marked seen when
/// this method is called, the method marks that value seen and returns
/// immediately. If the newest value has already been marked seen, then the
/// method sleeps until a new message is sent by a [`Sender`] connected to
/// this `Receiver`, or until all [`Sender`]s are dropped.
///
/// This method returns an error if and only if all [`Sender`]s are dropped.
///
/// For more information, see
/// [*Change notifications*](self#change-notifications) in the module-level documentation.
///
/// # Errors
///
/// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__
/// the current value is seen.
///
/// # Cancel safety
///
/// This method is cancel safe. If you use it as the event in a
Expand Down
53 changes: 53 additions & 0 deletions tokio/tests/sync_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,56 @@ async fn sender_closed_is_cooperative() {
_ = tokio::task::yield_now() => {},
}
}

#[tokio::test]
async fn changed_succeeds_on_closed_channel_with_unseen_value() {
let (tx, mut rx) = watch::channel("A");
tx.send("B").unwrap();

drop(tx);

rx.changed()
.await
.expect("should not return error as long as the current value is not seen");
}

#[tokio::test]
async fn changed_errors_on_closed_channel_with_seen_value() {
let (tx, mut rx) = watch::channel("A");
drop(tx);

rx.changed()
.await
.expect_err("should return error if the tx is closed and the current value is seen");
}

#[test]
fn has_changed_errors_on_closed_channel_with_unseen_value() {
let (tx, rx) = watch::channel("A");
tx.send("B").unwrap();

drop(tx);

rx.has_changed()
.expect_err("`has_changed` returns an error if and only if channel is closed. Even if the current value is not seen.");
}

#[test]
fn has_changed_errors_on_closed_channel_with_seen_value() {
let (tx, rx) = watch::channel("A");
drop(tx);

rx.has_changed()
.expect_err("`has_changed` returns an error if and only if channel is closed.");
}

#[tokio::test]
async fn wait_for_errors_on_closed_channel_true_predicate() {
let (tx, mut rx) = watch::channel("A");
tx.send("B").unwrap();
drop(tx);

rx.wait_for(|_| true).await.expect(
"`wait_for` call does not return error even if channel is closed when predicate is true for last value.",
);
}
Loading