Skip to content

Commit bab83ad

Browse files
committed
util: add JoinDeque structure
1 parent 9f59c69 commit bab83ad

File tree

2 files changed

+90
-0
lines changed

2 files changed

+90
-0
lines changed

tokio-util/src/task/join_deque.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use std::{collections::VecDeque, future::Future};
2+
use tokio::task::{AbortHandle, JoinError, JoinHandle};
3+
4+
/// A collection of tasks spawned on a Tokio runtime.
5+
///
6+
/// A `JoinDeque` can be used to await the completion of the tasks in FIFO
7+
/// order. That is, if tasks are spawned in the order A, B, C, then
8+
/// awaiting the next completed task will always return A first, then B,
9+
/// then C, regardless of the order in which the tasks actually complete.
10+
///
11+
/// All of the tasks must have the same return type `T`.
12+
///
13+
/// When the `JoinDeque` is dropped, all tasks in the `JoinDeque` are
14+
/// immediately aborted.
15+
#[derive(Debug)]
16+
pub struct JoinDeque<T>(VecDeque<JoinHandle<T>>);
17+
18+
impl<T> Default for JoinDeque<T> {
19+
fn default() -> Self {
20+
Self::new()
21+
}
22+
}
23+
24+
impl<T> Drop for JoinDeque<T> {
25+
fn drop(&mut self) {
26+
for join_handle in self.0.drain(..) {
27+
join_handle.abort();
28+
}
29+
}
30+
}
31+
32+
impl<T> JoinDeque<T> {
33+
/// Create a new empty `JoinDeque`.
34+
pub const fn new() -> Self {
35+
Self(VecDeque::new())
36+
}
37+
/// Creates an empty `JoinDeque` with space for at least `capacity` tasks.
38+
pub fn with_capacity(capacity: usize) -> Self {
39+
Self(VecDeque::with_capacity(capacity))
40+
}
41+
/// Returns the number of tasks currently in the `JoinDeque`.
42+
///
43+
/// This includes both tasks that are currently running and tasks that have
44+
/// completed but not yet been removed from the queue because outputting of
45+
/// them waits for FIFO order.
46+
pub fn len(&self) -> usize {
47+
self.0.len()
48+
}
49+
/// Returns whether the `JoinDeque` is empty.
50+
pub fn is_empty(&self) -> bool {
51+
self.0.is_empty()
52+
}
53+
/// Spawn the provided task on the `JoinDeque`, returning an [`AbortHandle`]
54+
/// that can be used to remotely cancel the task.
55+
///
56+
/// The provided future will start running in the background immediately
57+
/// when this method is called, even if you don't await anything on this
58+
/// `JoinDeque`.
59+
///
60+
/// # Panics
61+
///
62+
/// This method panics if called outside of a Tokio runtime.
63+
pub fn spawn<F>(&mut self, task: F) -> AbortHandle
64+
where
65+
F: Future<Output = T> + Send + 'static,
66+
T: Send + 'static,
67+
{
68+
let join_handle = tokio::spawn(task);
69+
let abort_handle = join_handle.abort_handle();
70+
self.0.push_back(join_handle);
71+
abort_handle
72+
}
73+
/// Waits until the next task in FIFO order completes and returns its output.
74+
///
75+
/// Returns `None` if the queue is empty.
76+
///
77+
/// # Cancel Safety
78+
///
79+
/// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!`
80+
/// statement and some other branch completes first, it is guaranteed that no tasks were
81+
/// removed from this `JoinDeque`.
82+
pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
83+
let result = self.0.front_mut()?.await;
84+
self.0.pop_front();
85+
Some(result)
86+
}
87+
}

tokio-util/src/task/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ cfg_rt! {
1313

1414
mod abort_on_drop;
1515
pub use abort_on_drop::AbortOnDropHandle;
16+
17+
mod join_deque;
18+
pub use join_deque::JoinDeque;
1619
}
1720

1821
#[cfg(feature = "join-map")]

0 commit comments

Comments
 (0)