Skip to content

Commit 3d17488

Browse files
authored
sync: fix mpsc bug related to closing the channel (#3215)
When closing a channel, it is possible to get into an invalid state when outstanding permits release capacity back to the channel.
1 parent c63057e commit 3d17488

File tree

2 files changed

+28
-0
lines changed

2 files changed

+28
-0
lines changed

tokio/src/sync/semaphore_ll.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -917,6 +917,10 @@ impl Waiter {
917917
let mut curr = WaiterState(self.state.load(Acquire));
918918

919919
loop {
920+
if curr.is_closed() {
921+
return 0;
922+
}
923+
920924
if !curr.is_queued() {
921925
assert_eq!(0, curr.permits_to_acquire());
922926
}

tokio/tests/sync_mpsc.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,3 +512,27 @@ fn ready_close_cancel_bounded() {
512512

513513
assert!(recv.is_woken());
514514
}
515+
516+
#[tokio::test]
517+
async fn permit_available_not_acquired_close() {
518+
use futures::future::poll_fn;
519+
520+
let (mut tx1, mut rx) = mpsc::channel::<()>(1);
521+
let mut tx2 = tx1.clone();
522+
523+
{
524+
let mut ready = task::spawn(poll_fn(|cx| tx1.poll_ready(cx)));
525+
assert_ready_ok!(ready.poll());
526+
}
527+
528+
let mut ready = task::spawn(poll_fn(|cx| tx2.poll_ready(cx)));
529+
assert_pending!(ready.poll());
530+
531+
rx.close();
532+
533+
drop(tx1);
534+
assert!(ready.is_woken());
535+
536+
drop(tx2);
537+
assert!(rx.recv().await.is_none());
538+
}

0 commit comments

Comments
 (0)