Skip to content

Async WaitGroup #2880

@tisonkun

Description

@tisonkun

This is a moved issue from rust-lang/libs-team#427 (comment).

Proposal

Problem statement

crossbeam provides a WaitGroup that is good for synchronizing a fork-join task.

However, it's a sync version, meaning that it's not suitable to use in async/await context.

I made a tidy and solid async WaitGroup implementation and wonder if it's suitable to contribute to the std lib (e.g., std::sync) or futures-util or anywhere that is (de-facto) standard so that the ecosystem can collaborate together instead of make one own each time.

Motivating examples or use cases

My motivating scenario is during the server starting, wait for all its components ready. For example,

let wg = WaitGroup::new();
let wg_clone = wg.clone();

// may repeat for different components
runtime.spawn(async move {
  // ... initializing ...
  drop(wg_clone);
  // accepting connections
});

wg.await;

Solution sketch

The implementation is tidy, and can be even inlined here:

#[derive(Clone)]
pub struct WaitGroup {
    inner: Arc<Inner>,
}

pub struct WaitGroupFuture {
    inner: Weak<Inner>,
}

impl WaitGroupFuture {
    /// Gets the number of active workers.
    pub fn workers(&self) -> usize {
        Weak::strong_count(&self.inner)
    }
}

struct Inner {
    waker: AtomicWaker,
}

impl Drop for Inner {
    fn drop(&mut self) {
        self.waker.wake();
    }
}

impl WaitGroup {
    pub fn new() -> Self {
        Self {
            inner: Arc::new(Inner {
                waker: AtomicWaker::new(),
            }),
        }
    }

    /// Gets the number of active workers.
    pub fn workers(&self) -> usize {
        Arc::strong_count(&self.inner) - 1
    }
}

impl Default for WaitGroup {
    fn default() -> Self {
        Self::new()
    }
}

impl IntoFuture for WaitGroup {
    type Output = ();

    type IntoFuture = WaitGroupFuture;

    fn into_future(self) -> Self::IntoFuture {
        WaitGroupFuture {
            inner: Arc::downgrade(&self.inner),
        }
    }
}

impl Future for WaitGroupFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.inner.upgrade() {
            Some(inner) => {
                inner.waker.register(cx.waker());
                Poll::Pending
            }
            None => Poll::Ready(()),
        }
    }
}

#[cfg(test)]
mod test {
    use pollster::FutureExt;

    use super::*;
    use crate::test_runtime;

    #[test]
    fn test_wait_group_match() {
        let wg = WaitGroup::new();

        for _ in 0..100 {
            let w = wg.clone();
            let _drop = test_runtime().spawn(async move {
                drop(w);
            });
        }

        wg.into_future().block_on();
    }

    #[test]
    fn test_wait_group_timeout() {
        let wg = WaitGroup::new();
        let _wg_clone = wg.clone();
        test_runtime().block_on(async move {
            tokio::select! {
                _ = wg => panic!("wait group should timeout"),
                _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {}
            }
        });
    }
}

Alternatives

The main argument is whether it should be in the std lib, or anywhere suitable for the most Rust developers to collaborate on.

Links and related work

This is somehow derivative from waitgroup-rs.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions