Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Allow task manager to have children #6771

Merged
merged 14 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions client/service/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use exit_future::Signal;
use log::{debug, error};
use futures::{
Future, FutureExt, StreamExt,
future::{select, Either, BoxFuture},
future::{select, Either, BoxFuture, join_all, try_join_all, select_all, self},
sink::SinkExt,
};
use prometheus_endpoint::{
Expand Down Expand Up @@ -214,8 +214,14 @@ pub struct TaskManager {
essential_failed_rx: TracingUnboundedReceiver<()>,
/// Things to keep alive until the task manager is dropped.
keep_alive: Box<dyn std::any::Any + Send + Sync>,
/// A sender to a stream of background tasks. This is used for the completion future.
task_notifier: TracingUnboundedSender<JoinFuture>,
/// This future will complete when all the tasks are joined and the stream is closed.
completion_future: JoinFuture,
/// A list of other `TaskManager`'s to terminate and gracefully shutdown when the parent
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
/// task fails.
children: Vec<TaskManager>,
}

impl TaskManager {
Expand Down Expand Up @@ -251,6 +257,7 @@ impl TaskManager {
keep_alive: Box::new(()),
task_notifier,
completion_future,
children: Vec::new(),
})
}

Expand All @@ -271,12 +278,21 @@ impl TaskManager {

/// Send the signal for termination, prevent new tasks to be created, await for all the existing
/// tasks to be finished and drop the object. You can consider this as an async drop.
///
/// It's always better to call and await this function before exiting the process as background
/// tasks may be running in the background. If the process exit and the background tasks are not
/// cancelled, this will lead to objects not getting dropped properly.
///
/// This is an issue in some cases as some of our dependencies do require that we drop all the
/// objects properly otherwise it triggers a SIGABRT on exit.
pub fn clean_shutdown(mut self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
self.terminate();
let children_shutdowns = self.children.into_iter().map(|x| x.clean_shutdown());
let keep_alive = self.keep_alive;
let completion_future = self.completion_future;

Box::pin(async move {
join_all(children_shutdowns).await;
completion_future.await;
drop(keep_alive);
})
Expand All @@ -289,31 +305,45 @@ impl TaskManager {
///
/// This function will not wait until the end of the remaining task. You must call and await
/// `clean_shutdown()` after this.
pub fn future<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
Box::pin(async move {
let mut t1 = self.essential_failed_rx.next().fuse();
let mut t2 = self.on_exit.clone().fuse();

futures::select! {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this whole function not async? Instead of returning a boxed Future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point!! I think I'm used to do that because you can have async fn in traits.

But it still doesn't work:

error[E0733]: recursion in an `async fn` requires boxing
   --> client/service/src/task_manager/mod.rs:288:40
    |
288 |     pub async fn clean_shutdown(mut self) {
    |                                           ^ recursive `async fn`
    |
    = note: a recursive `async fn` must be 

I isolated the suspect. It seems those 2 lines make the function "recursive":

		join_all(children_shutdowns).await;
		completion_future.await;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you remove the internal async move?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes obviously XD

_ = t1 => Err(Error::Other("Essential task failed.".into())),
_ = t2 => Ok(()),
}
})
pub fn future<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
let t1 = self.essential_failed_rx
.next()
.then(|_| future::ready(Err(Error::Other("Essential task failed.".into()))))
.boxed();
let t2 = self.on_exit.clone().then(|_| future::ready(Ok(()))).boxed();
let res = select_all(vec![t1, t2]).map(|v| v.0).boxed();

// Join all the task managers to make sure everything has finished as intended.
try_join_all(
self.children.iter_mut().map(|x| x.future().boxed()).chain(std::iter::once(res)),
).map(|res| res.map(drop)).boxed()
}

/// Signal to terminate all the running tasks.
pub fn terminate(&mut self) {
if let Some(signal) = self.signal.take() {
let _ = signal.fire();
// NOTE: task will prevent new tasks to be spawned
// NOTE: this will prevent new tasks to be spawned
self.task_notifier.close_channel();
for child in self.children.iter_mut() {
child.terminate();
}
}
}

/// Set what the task manager should keep alivei
/// Set what the task manager should keep alive.
pub(super) fn keep_alive<T: 'static + Send + Sync>(&mut self, to_keep_alive: T) {
self.keep_alive = Box::new(to_keep_alive);
}

/// Register another TaskManager to terminate and gracefully shutdown when the parent
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
/// task fails. (But don't end the parent if a child's normal task fails.)
pub fn add_children(&mut self, child: TaskManager) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did I call that "add_children" if it adds only one child? XD

self.children.push(child);
}
}

#[derive(Clone)]
Expand Down
106 changes: 103 additions & 3 deletions client/service/src/task_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::config::TaskExecutor;
use crate::task_manager::TaskManager;
use futures::future::FutureExt;
use futures::{future::FutureExt, pin_mut, select};
use parking_lot::Mutex;
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -82,7 +82,7 @@ async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any)
}

#[test]
fn ensure_futures_are_awaited_on_shutdown() {
fn ensure_tasks_are_awaited_on_shutdown() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();
Expand Down Expand Up @@ -187,7 +187,7 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() {
}

#[test]
fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() {
fn ensure_task_manager_future_ends_with_error_when_essential_task_fails() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();
Expand All @@ -208,3 +208,103 @@ fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() {
runtime.block_on(task_manager.clean_shutdown());
assert_eq!(drop_tester, 0);
}

#[test]
fn ensure_children_tasks_ends_when_task_manager_terminated() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();

let mut task_manager = TaskManager::new(task_executor.clone(), None).unwrap();
let child_1 = TaskManager::new(task_executor.clone(), None).unwrap();
let spawn_handle_child_1 = child_1.spawn_handle();
let child_2 = TaskManager::new(task_executor.clone(), None).unwrap();
let spawn_handle_child_2 = child_2.spawn_handle();
task_manager.add_children(child_1);
task_manager.add_children(child_2);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 4);
// allow the tasks to even start
runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 4);
task_manager.terminate();
runtime.block_on(task_manager.future()).expect("future has ended without error");
runtime.block_on(task_manager.clean_shutdown());
assert_eq!(drop_tester, 0);
}

#[test]
fn ensure_task_manager_future_ends_with_error_when_childs_essential_task_fails() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();

let mut task_manager = TaskManager::new(task_executor.clone(), None).unwrap();
let child_1 = TaskManager::new(task_executor.clone(), None).unwrap();
let spawn_handle_child_1 = child_1.spawn_handle();
let spawn_essential_handle_child_1 = child_1.spawn_essential_handle();
let child_2 = TaskManager::new(task_executor.clone(), None).unwrap();
let spawn_handle_child_2 = child_2.spawn_handle();
task_manager.add_children(child_1);
task_manager.add_children(child_2);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 4);
// allow the tasks to even start
runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 4);
spawn_essential_handle_child_1.spawn("task5", async { panic!("task failed") });
runtime.block_on(task_manager.future()).expect_err("future()'s Result must be Err");
assert_eq!(drop_tester, 4);
runtime.block_on(task_manager.clean_shutdown());
assert_eq!(drop_tester, 0);
}

#[test]
fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();

let mut task_manager = TaskManager::new(task_executor.clone(), None).unwrap();
let child_1 = TaskManager::new(task_executor.clone(), None).unwrap();
let spawn_handle_child_1 = child_1.spawn_handle();
let child_2 = TaskManager::new(task_executor.clone(), None).unwrap();
let spawn_handle_child_2 = child_2.spawn_handle();
task_manager.add_children(child_1);
task_manager.add_children(child_2);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 4);
// allow the tasks to even start
runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 4);
spawn_handle_child_1.spawn("task5", async { panic!("task failed") });
runtime.block_on(async {
let t1 = task_manager.future().fuse();
let t2 = tokio::time::delay_for(Duration::from_secs(3)).fuse();

pin_mut!(t1, t2);

select! {
_ = t1 => panic!("task should not have stopped"),
_ = t2 => {},
}
});
assert_eq!(drop_tester, 4);
runtime.block_on(task_manager.clean_shutdown());
assert_eq!(drop_tester, 0);
}
5 changes: 5 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ The format is based on [Keep a Changelog].

## Unreleased

Client
------

* Child nodes can be handled by adding a child `TaskManager` to the parent's `TaskManager` (#6771)

## 2.0.0-rc4 -> 2.0.0-rc5 – River Dolphin

Runtime
Expand Down