From 1656d8e231903a7b84b9e2d5e3db7aeed13a2966 Mon Sep 17 00:00:00 2001 From: Rafael Bachmann Date: Thu, 17 Oct 2024 12:02:12 +0300 Subject: [PATCH] sync: add `mpsc::Receiver::blocking_recv_many` (#6867) Fixes: #6865 Co-authored-by: Rafael Bachmann --- tokio/src/sync/mpsc/bounded.rs | 10 ++++++++++ tokio/src/sync/mpsc/unbounded.rs | 10 ++++++++++ tokio/tests/sync_panic.rs | 33 ++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index a6aecf007ca..99a3f0d5c4e 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -419,6 +419,16 @@ impl Receiver { crate::future::block_on(self.recv()) } + /// Variant of [`Self::recv_many`] for blocking contexts. + /// + /// The same conditions as in [`Self::blocking_recv`] apply. + #[track_caller] + #[cfg(feature = "sync")] + #[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))] + pub fn blocking_recv_many(&mut self, buffer: &mut Vec, limit: usize) -> usize { + crate::future::block_on(self.recv_many(buffer, limit)) + } + /// Closes the receiving half of a channel without dropping it. /// /// This prevents any further messages from being sent on the channel while diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index f794f4073d2..a9232dc934c 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -319,6 +319,16 @@ impl UnboundedReceiver { crate::future::block_on(self.recv()) } + /// Variant of [`Self::recv_many`] for blocking contexts. + /// + /// The same conditions as in [`Self::blocking_recv`] apply. + #[track_caller] + #[cfg(feature = "sync")] + #[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))] + pub fn blocking_recv_many(&mut self, buffer: &mut Vec, limit: usize) -> usize { + crate::future::block_on(self.recv_many(buffer, limit)) + } + /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while diff --git a/tokio/tests/sync_panic.rs b/tokio/tests/sync_panic.rs index 41bf0850068..c781c846bf5 100644 --- a/tokio/tests/sync_panic.rs +++ b/tokio/tests/sync_panic.rs @@ -129,6 +129,22 @@ fn mpsc_bounded_receiver_blocking_recv_panic_caller() -> Result<(), Box Result<(), Box> { + let panic_location_file = test_panic(|| { + let rt = current_thread(); + let (_tx, mut rx) = mpsc::channel::(1); + rt.block_on(async { + let _ = rx.blocking_recv(); + }); + }); + + // The panic location should be in this file + assert_eq!(&panic_location_file.unwrap(), file!()); + + Ok(()) +} + #[test] fn mpsc_bounded_sender_blocking_send_panic_caller() -> Result<(), Box> { let panic_location_file = test_panic(|| { @@ -161,6 +177,23 @@ fn mpsc_unbounded_receiver_blocking_recv_panic_caller() -> Result<(), Box Result<(), Box> { + let panic_location_file = test_panic(|| { + let rt = current_thread(); + let (_tx, mut rx) = mpsc::unbounded_channel::(); + let mut vec = vec![]; + rt.block_on(async { + let _ = rx.blocking_recv_many(&mut vec, 1); + }); + }); + + // The panic location should be in this file + assert_eq!(&panic_location_file.unwrap(), file!()); + + Ok(()) +} + #[test] fn semaphore_merge_unrelated_owned_permits() -> Result<(), Box> { let panic_location_file = test_panic(|| {