Skip to content

Commit 355e4c1

Browse files
committed
add test to ensure that no pending msgs are kept in the channel after rx was dropped
1 parent fe51bd7 commit 355e4c1

File tree

1 file changed

+63
-0
lines changed

1 file changed

+63
-0
lines changed

tokio-util/tests/mpsc.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
use futures::future::poll_fn;
2+
use std::ops::Drop;
3+
use std::sync::atomic::{
4+
AtomicUsize,
5+
Ordering::{Acquire, Release},
6+
};
7+
use std::time::Duration;
8+
use tokio::join;
29
use tokio::sync::mpsc::{self, channel};
310
use tokio::sync::oneshot;
11+
use tokio::time;
412
use tokio_test::task::spawn;
513
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
614
use tokio_util::sync::PollSender;
@@ -393,3 +401,58 @@ async fn actor_weak_sender() {
393401

394402
let _ = actor_handle.await;
395403
}
404+
405+
static NUM_DROPPED: AtomicUsize = AtomicUsize::new(0);
406+
407+
#[derive(Debug)]
408+
struct Msg;
409+
410+
impl Drop for Msg {
411+
fn drop(&mut self) {
412+
NUM_DROPPED.fetch_add(1, Release);
413+
}
414+
}
415+
416+
// Tests that no pending messages are put onto the channel after `Rx` was
417+
// dropped.
418+
//
419+
// Note: After the introduction of `WeakSender`, which internally
420+
// used `Arc` and doesn't call a drop of the channel after the last strong
421+
// `Sender` was dropped while more than one `WeakSender` remains, we want to
422+
// ensure that no messages are kept in the channel, which were sent after
423+
// the receiver was dropped.
424+
#[tokio::test(start_paused = true)]
425+
async fn test_msgs_dropped_on_rx_drop() {
426+
fn ms(millis: u64) -> Duration {
427+
Duration::from_millis(millis)
428+
}
429+
430+
let (tx, mut rx) = mpsc::channel(3);
431+
432+
let rx_handle = tokio::spawn(async move {
433+
let _ = rx.recv().await.unwrap();
434+
let _ = rx.recv().await.unwrap();
435+
time::sleep(ms(1)).await;
436+
drop(rx);
437+
438+
time::advance(ms(1)).await;
439+
});
440+
441+
let tx_handle = tokio::spawn(async move {
442+
let _ = tx.send(Msg {}).await.unwrap();
443+
let _ = tx.send(Msg {}).await.unwrap();
444+
445+
// This msg will be pending and should be dropped when `rx` is dropped
446+
let _ = tx.send(Msg {}).await.unwrap();
447+
time::advance(ms(1)).await;
448+
449+
// This msg will not be put onto `Tx` list anymore, since `Rx` is closed.
450+
time::sleep(ms(1)).await;
451+
let _ = tx.send(Msg {}).await.unwrap();
452+
453+
// Ensure that third message isn't put onto the channel anymore
454+
assert_eq!(NUM_DROPPED.load(Acquire), 4);
455+
});
456+
457+
let (_, _) = join!(rx_handle, tx_handle);
458+
}

0 commit comments

Comments
 (0)