Skip to content

Commit 3004a32

Browse files
committed
address review
1 parent fa8bdf9 commit 3004a32

File tree

3 files changed

+21
-36
lines changed

3 files changed

+21
-36
lines changed

tokio/src/sync/mpsc/bounded.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,9 +1033,6 @@ impl<T> Sender<T> {
10331033
/// channel were dropped and only `WeakSender` instances remain,
10341034
/// the channel is closed.
10351035
pub fn downgrade(&self) -> WeakSender<T> {
1036-
// Note: If this is the last `Sender` instance we want to close the
1037-
// channel when downgrading, so it's important to move into `self` here.
1038-
10391036
WeakSender {
10401037
chan: self.chan.downgrade(),
10411038
}

tokio/src/sync/mpsc/chan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub(crate) trait Semaphore {
4747
fn is_closed(&self) -> bool;
4848
}
4949

50-
pub(crate) struct Chan<T, S> {
50+
pub(super) struct Chan<T, S> {
5151
/// Notifies all tasks listening for the receiver being dropped.
5252
notify_rx_closed: Notify,
5353

tokio/tests/sync_mpsc.rs

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,14 @@ use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
1010
#[cfg(not(all(target_arch = "wasm32", not(target_os = "wasi"))))]
1111
use tokio::test as maybe_tokio_test;
1212

13-
use tokio::join;
1413
use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
1514
use tokio::sync::mpsc::{self, channel};
1615
use tokio::sync::oneshot;
17-
use tokio::time;
1816
use tokio_test::*;
1917

2018
use std::sync::atomic::AtomicUsize;
2119
use std::sync::atomic::Ordering::{Acquire, Release};
2220
use std::sync::Arc;
23-
use std::time::Duration;
2421

2522
#[cfg(not(target_arch = "wasm32"))]
2623
mod support {
@@ -838,47 +835,36 @@ impl Drop for Msg {
838835
// `Sender` was dropped while more than one `WeakSender` remains, we want to
839836
// ensure that no messages are kept in the channel, which were sent after
840837
// the receiver was dropped.
841-
#[tokio::test(start_paused = true)]
838+
#[tokio::test]
842839
async fn test_msgs_dropped_on_rx_drop() {
843-
fn ms(millis: u64) -> Duration {
844-
Duration::from_millis(millis)
845-
}
846-
847840
let (tx, mut rx) = mpsc::channel(3);
848841

849-
let rx_handle = tokio::spawn(async move {
850-
let _ = rx.recv().await.unwrap();
851-
let _ = rx.recv().await.unwrap();
852-
time::sleep(ms(1)).await;
853-
drop(rx);
842+
let _ = tx.send(Msg {}).await.unwrap();
843+
let _ = tx.send(Msg {}).await.unwrap();
854844

855-
time::advance(ms(1)).await;
856-
});
845+
// This msg will be pending and should be dropped when `rx` is dropped
846+
let sent_fut = tx.send(Msg {});
857847

858-
let tx_handle = tokio::spawn(async move {
859-
let _ = tx.send(Msg {}).await.unwrap();
860-
let _ = tx.send(Msg {}).await.unwrap();
848+
let _ = rx.recv().await.unwrap();
849+
let _ = rx.recv().await.unwrap();
861850

862-
// This msg will be pending and should be dropped when `rx` is dropped
863-
let _ = tx.send(Msg {}).await.unwrap();
864-
time::advance(ms(1)).await;
851+
let _ = sent_fut.await.unwrap();
865852

866-
// This msg will not be put onto `Tx` list anymore, since `Rx` is closed.
867-
time::sleep(ms(1)).await;
868-
assert!(tx.send(Msg {}).await.is_err());
853+
drop(rx);
869854

870-
// Ensure that third message isn't put onto the channel anymore
871-
assert_eq!(NUM_DROPPED.load(Acquire), 4);
872-
});
855+
assert_eq!(NUM_DROPPED.load(Acquire), 3);
873856

874-
let (_, _) = join!(rx_handle, tx_handle);
857+
// This msg will not be put onto `Tx` list anymore, since `Rx` is closed.
858+
assert!(tx.send(Msg {}).await.is_err());
859+
860+
assert_eq!(NUM_DROPPED.load(Acquire), 4);
875861
}
876862

877863
// Tests that a `WeakSender` is upgradeable when other `Sender`s exist.
878864
#[tokio::test]
879865
async fn downgrade_upgrade_sender_success() {
880866
let (tx, _rx) = mpsc::channel::<i32>(1);
881-
let weak_tx = tx.clone().downgrade();
867+
let weak_tx = tx.downgrade();
882868
assert!(weak_tx.upgrade().is_some());
883869
}
884870

@@ -897,6 +883,7 @@ async fn downgrade_upgrade_sender_failure() {
897883
async fn downgrade_drop_upgrade() {
898884
let (tx, _rx) = mpsc::channel::<i32>(1);
899885

886+
// the cloned `Tx` is dropped right away
900887
let weak_tx = tx.clone().downgrade();
901888
drop(tx);
902889
assert!(weak_tx.upgrade().is_none());
@@ -907,7 +894,7 @@ async fn downgrade_drop_upgrade() {
907894
#[tokio::test]
908895
async fn downgrade_get_permit_upgrade_no_senders() {
909896
let (tx, _rx) = mpsc::channel::<i32>(1);
910-
let weak_tx = tx.clone().downgrade();
897+
let weak_tx = tx.downgrade();
911898
let _permit = tx.reserve_owned().await.unwrap();
912899
assert!(weak_tx.upgrade().is_some());
913900
}
@@ -920,12 +907,13 @@ async fn downgrade_upgrade_get_permit_no_senders() {
920907
let tx2 = tx.clone();
921908
let _permit = tx.reserve_owned().await.unwrap();
922909
let weak_tx = tx2.downgrade();
910+
drop(tx2);
923911
assert!(weak_tx.upgrade().is_some());
924912
}
925913

926-
// Tests that `Clone` of `WeakSender` doesn't decrement `tx_count`.
914+
// Tests that `downgrade` does not change the `tx_count` of the channel.
927915
#[tokio::test]
928-
async fn test_weak_sender_clone() {
916+
async fn test_tx_count_weak_sender() {
929917
let (tx, _rx) = mpsc::channel::<i32>(1);
930918
let tx_weak = tx.downgrade();
931919
let tx_weak2 = tx.downgrade();

0 commit comments

Comments
 (0)