Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,12 @@ async fn changed_impl<T>(
shared: &Shared<T>,
version: &mut Version,
) -> Result<(), error::RecvError> {
let state = shared.state.load();
let sender_has_been_dropped = state.is_closed();
if sender_has_been_dropped {
return Err(error::RecvError(()));
}

crate::trace::async_trace_leaf().await;

loop {
Expand Down
23 changes: 23 additions & 0 deletions tokio/tests/sync_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,26 @@ async fn sender_closed_is_cooperative() {
_ = tokio::task::yield_now() => {},
}
}

#[tokio::test]
async fn changed_errors_on_closed_channel() {
let (tx, mut rx) = watch::channel(());
tx.send(()).unwrap();

drop(tx);

rx.changed().await.expect_err(
"`changed` call returns an error IFF the channel is closed by dropping the senders.",
);
}

#[tokio::test]
async fn wait_for_errors_on_closed_channel_true_predicate() {
let (tx, mut rx) = watch::channel(());

drop(tx);

rx.wait_for(|_| true).await.expect(
"`wait_for` call does not return error even if channel is closed when predicate is true for last value.",
);
}
Loading