Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Allow task manager to have children #6771

Merged
merged 14 commits into from
Aug 6, 2020
Merged
Prev Previous commit
Next Next commit
WIP
Forked at: 19c1d90
Parent branch: origin/master
  • Loading branch information
cecton committed Jul 30, 2020
commit 18c9792f008a6c622f1c9d650c7d9667c1dfd850
25 changes: 24 additions & 1 deletion client/service/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use exit_future::Signal;
use log::{debug, error};
use futures::{
Future, FutureExt, StreamExt,
future::{select, Either, BoxFuture},
future::{select, Either, BoxFuture, select_all},
sink::SinkExt,
};
use prometheus_endpoint::{
Expand Down Expand Up @@ -214,8 +214,14 @@ pub struct TaskManager {
essential_failed_rx: TracingUnboundedReceiver<()>,
/// Things to keep alive until the task manager is dropped.
keep_alive: Box<dyn std::any::Any + Send + Sync>,
/// A sender to a stream of background tasks. This is used for the completion future.
task_notifier: TracingUnboundedSender<JoinFuture>,
/// This future will complete when all the tasks are joined and the stream is closed.
completion_future: JoinFuture,
/// A list of other `TaskManager`'s to terminate and gracefully shutdown when the parent
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
/// task fails.
children: Vec<Box<TaskManager>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a box?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 fixed 😁

}

impl TaskManager {
Expand Down Expand Up @@ -251,6 +257,7 @@ impl TaskManager {
keep_alive: Box::new(()),
task_notifier,
completion_future,
children: Vec::new(),
})
}

Expand All @@ -271,6 +278,13 @@ impl TaskManager {

/// Send the signal for termination, prevent new tasks to be created, await for all the existing
/// tasks to be finished and drop the object. You can consider this as an async drop.
///
/// It's always better to call and await this function before exiting the process as background
/// tasks may be running in the background. If the process exit and the background tasks are not
/// cancelled, this will lead to objects not getting dropped properly.
///
/// This is an issue in some cases as some of our dependencies do require that we drop all the
/// objects properly otherwise it triggers a SIGABRT on exit.
pub fn clean_shutdown(mut self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
self.terminate();
let keep_alive = self.keep_alive;
Expand All @@ -293,10 +307,12 @@ impl TaskManager {
Box::pin(async move {
let mut t1 = self.essential_failed_rx.next().fuse();
let mut t2 = self.on_exit.clone().fuse();
let mut t3 = select_all(self.children.iter_mut().map(|x| x.future())).fuse();

futures::select! {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this whole function not async? Instead of returning a boxed Future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point!! I think I'm used to do that because you can have async fn in traits.

But it still doesn't work:

error[E0733]: recursion in an `async fn` requires boxing
   --> client/service/src/task_manager/mod.rs:288:40
    |
288 |     pub async fn clean_shutdown(mut self) {
    |                                           ^ recursive `async fn`
    |
    = note: a recursive `async fn` must be 

I isolated the suspect. It seems those 2 lines make the function "recursive":

		join_all(children_shutdowns).await;
		completion_future.await;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you remove the internal async move?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes obviously XD

_ = t1 => Err(Error::Other("Essential task failed.".into())),
_ = t2 => Ok(()),
(res, _, _) = t3 => res,
}
})
}
Expand All @@ -314,6 +330,13 @@ impl TaskManager {
pub(super) fn keep_alive<T: 'static + Send + Sync>(&mut self, to_keep_alive: T) {
self.keep_alive = Box::new(to_keep_alive);
}

/// Register another TaskManager to terminate and gracefully shutdown when the parent
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
/// task fails.
pub fn add_children(&mut self, child: TaskManager) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did I call that "add_children" if it adds only one child? XD

self.children.push(Box::new(child));
}
}

#[derive(Clone)]
Expand Down