Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Allow task manager to have children #6771

Merged
merged 14 commits into from
Aug 6, 2020
Merged
Prev Previous commit
Next Next commit
Revert "Make future nicer"
This reverts commit 49fb8fb.
  • Loading branch information
cecton committed Aug 6, 2020
commit 02c2ece0f15f90578d72f2be747bcfd74524e59b
44 changes: 29 additions & 15 deletions client/service/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use exit_future::Signal;
use log::{debug, error};
use futures::{
Future, FutureExt, StreamExt,
future::{select, Either, BoxFuture, join_all, try_join_all, select_all, self},
future::{select, Either, BoxFuture, select_all, join_all},
sink::SinkExt,
};
use prometheus_endpoint::{
Expand Down Expand Up @@ -305,20 +305,34 @@ impl TaskManager {
///
/// This function will not wait until the end of the remaining task. You must call and await
/// `clean_shutdown()` after this.
pub fn future<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
let t1 = self.essential_failed_rx
.next()
.then(|_| future::ready(Err(Error::Other("Essential task failed.".into()))))
.boxed();
let t2 = self.on_exit.clone().then(|_| future::ready(Ok(()))).boxed();
let res = select_all(vec![t1, t2]).map(|v| v.0).boxed();

// Join all the task managers to make sure everything has finished as intended.
try_join_all(
self.children.iter_mut().map(|x| x.future().boxed()).chain(std::iter::once(res)),
).map(|res| res.map(drop)).boxed()
pub fn future<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
Box::pin(async move {
let mut t1 = self.essential_failed_rx.next().fuse();
let mut t2 = self.on_exit.clone().fuse();

if self.children.is_empty() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This optimization is not required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not XD select_all panic if the list is empty https://docs.rs/futures/0.3.5/futures/future/fn.select_all.html

join_all doesn't though 🤔

loop {
futures::select! {
_ = t1 => break Err(Error::Other("Essential task failed.".into())),
_ = t2 => break Ok(()),
}
}
} else {
let mut t3 = select_all(self.children.iter_mut().map(|x| x.future())).fuse();

loop {
futures::select! {
_ = t1 => break Err(Error::Other("Essential task failed.".into())),
_ = t2 => break Ok(()),
(res, _, _) = t3 => if res.is_err() {
break res;
} else {
continue;
},
}
}
}
})
}

/// Signal to terminate all the running tasks.
Expand Down