-
Notifications
You must be signed in to change notification settings - Fork 655
Description
This is a moved issue from rust-lang/libs-team#427 (comment).
Proposal
Problem statement
crossbeam provides a WaitGroup that is good for synchronizing a fork-join task.
However, it's a sync version, meaning that it's not suitable to use in async
/await
context.
I made a tidy and solid async WaitGroup
implementation and wonder if it's suitable to contribute to the std lib (e.g., std::sync
) or futures-util
or anywhere that is (de-facto) standard so that the ecosystem can collaborate together instead of make one own each time.
Motivating examples or use cases
My motivating scenario is during the server starting, wait for all its components ready. For example,
let wg = WaitGroup::new();
let wg_clone = wg.clone();
// may repeat for different components
runtime.spawn(async move {
// ... initializing ...
drop(wg_clone);
// accepting connections
});
wg.await;
Solution sketch
The implementation is tidy, and can be even inlined here:
#[derive(Clone)]
pub struct WaitGroup {
inner: Arc<Inner>,
}
pub struct WaitGroupFuture {
inner: Weak<Inner>,
}
impl WaitGroupFuture {
/// Gets the number of active workers.
pub fn workers(&self) -> usize {
Weak::strong_count(&self.inner)
}
}
struct Inner {
waker: AtomicWaker,
}
impl Drop for Inner {
fn drop(&mut self) {
self.waker.wake();
}
}
impl WaitGroup {
pub fn new() -> Self {
Self {
inner: Arc::new(Inner {
waker: AtomicWaker::new(),
}),
}
}
/// Gets the number of active workers.
pub fn workers(&self) -> usize {
Arc::strong_count(&self.inner) - 1
}
}
impl Default for WaitGroup {
fn default() -> Self {
Self::new()
}
}
impl IntoFuture for WaitGroup {
type Output = ();
type IntoFuture = WaitGroupFuture;
fn into_future(self) -> Self::IntoFuture {
WaitGroupFuture {
inner: Arc::downgrade(&self.inner),
}
}
}
impl Future for WaitGroupFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.upgrade() {
Some(inner) => {
inner.waker.register(cx.waker());
Poll::Pending
}
None => Poll::Ready(()),
}
}
}
#[cfg(test)]
mod test {
use pollster::FutureExt;
use super::*;
use crate::test_runtime;
#[test]
fn test_wait_group_match() {
let wg = WaitGroup::new();
for _ in 0..100 {
let w = wg.clone();
let _drop = test_runtime().spawn(async move {
drop(w);
});
}
wg.into_future().block_on();
}
#[test]
fn test_wait_group_timeout() {
let wg = WaitGroup::new();
let _wg_clone = wg.clone();
test_runtime().block_on(async move {
tokio::select! {
_ = wg => panic!("wait group should timeout"),
_ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {}
}
});
}
}
Alternatives
The main argument is whether it should be in the std lib, or anywhere suitable for the most Rust developers to collaborate on.
Links and related work
This is somehow derivative from waitgroup-rs.