Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Allow task manager to have children #6771

Merged
merged 14 commits into from
Aug 6, 2020
Merged
Prev Previous commit
Next Next commit
Simplify
  • Loading branch information
cecton committed Aug 6, 2020
commit 4fdf8035f4aa8f163f11020aa3fa1f5757e1d459
32 changes: 9 additions & 23 deletions client/service/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use exit_future::Signal;
use log::{debug, error};
use futures::{
Future, FutureExt, StreamExt,
future::{select, Either, BoxFuture, select_all, join_all},
future::{select, Either, BoxFuture, join_all, try_join_all, pending},
sink::SinkExt,
};
use prometheus_endpoint::{
Expand Down Expand Up @@ -309,28 +309,14 @@ impl TaskManager {
Box::pin(async move {
let mut t1 = self.essential_failed_rx.next().fuse();
let mut t2 = self.on_exit.clone().fuse();

if self.children.is_empty() {
loop {
futures::select! {
_ = t1 => break Err(Error::Other("Essential task failed.".into())),
_ = t2 => break Ok(()),
}
}
} else {
let mut t3 = select_all(self.children.iter_mut().map(|x| x.future())).fuse();

loop {
futures::select! {
_ = t1 => break Err(Error::Other("Essential task failed.".into())),
_ = t2 => break Ok(()),
(res, _, _) = t3 => if res.is_err() {
break res;
} else {
continue;
},
}
}
let mut t3 = try_join_all(self.children.iter_mut().map(|x| x.future()))
// Never end this future if there is no error
.then(|res| async { Ok(res.map(|_| pending::<()>())?.await) }).boxed().fuse();

futures::select! {
_ = t1 => Err(Error::Other("Essential task failed.".into())),
_ = t2 => Ok(()),
res = t3 => res.map(|_| ()),
}
})
}
Expand Down