Skip to content

Commit

Permalink
feat: make TerminatableTask terminate itself when dropped (#1151)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan authored Jun 20, 2024
1 parent 93f93d2 commit 2500e5a
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 34 deletions.
26 changes: 17 additions & 9 deletions commons/zenoh-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,16 @@ impl TaskController {
}

pub struct TerminatableTask {
handle: JoinHandle<()>,
handle: Option<JoinHandle<()>>,
token: CancellationToken,
}

impl Drop for TerminatableTask {
fn drop(&mut self) {
self.terminate(std::time::Duration::from_secs(10));
}
}

impl TerminatableTask {
pub fn create_cancellation_token() -> CancellationToken {
CancellationToken::new()
Expand All @@ -147,7 +153,7 @@ impl TerminatableTask {
T: Send + 'static,
{
TerminatableTask {
handle: rt.spawn(future.map(|_f| ())),
handle: Some(rt.spawn(future.map(|_f| ()))),
token,
}
}
Expand All @@ -168,24 +174,26 @@ impl TerminatableTask {
};

TerminatableTask {
handle: rt.spawn(task),
handle: Some(rt.spawn(task)),
token,
}
}

/// Attempts to terminate the task.
/// Returns true if task completed / aborted within timeout duration, false otherwise.
pub fn terminate(self, timeout: Duration) -> bool {
pub fn terminate(&mut self, timeout: Duration) -> bool {
ResolveFuture::new(async move { self.terminate_async(timeout).await }).res_sync()
}

/// Async version of [`TerminatableTask::terminate()`].
pub async fn terminate_async(self, timeout: Duration) -> bool {
pub async fn terminate_async(&mut self, timeout: Duration) -> bool {
self.token.cancel();
if tokio::time::timeout(timeout, self.handle).await.is_err() {
tracing::error!("Failed to terminate the task");
return false;
};
if let Some(handle) = self.handle.take() {
if tokio::time::timeout(timeout, handle).await.is_err() {
tracing::error!("Failed to terminate the task");
return false;
};
}
true
}
}
2 changes: 1 addition & 1 deletion zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl<'a> PublicationCache<'a> {
let PublicationCache {
_queryable,
local_sub,
task,
mut task,
} = self;
_queryable.undeclare().res_async().await?;
local_sub.undeclare().res_async().await?;
Expand Down
10 changes: 0 additions & 10 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use std::{
any::Any,
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI, WhatAmIMatcher, ZenohId};
use zenoh_protocol::{
Expand Down Expand Up @@ -116,15 +115,6 @@ struct HatTables {
peers_trees_task: Option<TerminatableTask>,
}

impl Drop for HatTables {
fn drop(&mut self) {
if self.peers_trees_task.is_some() {
let task = self.peers_trees_task.take().unwrap();
task.terminate(Duration::from_secs(10));
}
}
}

impl HatTables {
fn new() -> Self {
Self {
Expand Down
14 changes: 0 additions & 14 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use std::{
collections::{hash_map::DefaultHasher, HashMap, HashSet},
hash::Hasher,
sync::Arc,
time::Duration,
};
use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI, WhatAmIMatcher, ZenohId};
use zenoh_protocol::{
Expand Down Expand Up @@ -127,19 +126,6 @@ struct HatTables {
router_peers_failover_brokering: bool,
}

impl Drop for HatTables {
fn drop(&mut self) {
if self.peers_trees_task.is_some() {
let task = self.peers_trees_task.take().unwrap();
task.terminate(Duration::from_secs(10));
}
if self.routers_trees_task.is_some() {
let task = self.routers_trees_task.take().unwrap();
task.terminate(Duration::from_secs(10));
}
}
}

impl HatTables {
fn new(router_peers_failover_brokering: bool) -> Self {
Self {
Expand Down

0 comments on commit 2500e5a

Please sign in to comment.