Skip to content

Commit

Permalink
support pre-delay signal trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
GlenDC committed Sep 30, 2024
1 parent 9cb360f commit 3d5997c
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 13 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

# 0.2.1 (30. September, 2024)

Expose a signal that can be awaited on without awaiting the configured
delay first. If no delay is used this API is equivalent to the already
existing `cancelled` function.

This can be used for scenarios where you do not need a graceful buffer and would like to
cancel as soon as a signal is received.

# 0.2.0 (29. September, 2024)

This is usability wise not a breaking release,
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
categories = ["asynchronous", "network-programming"]
edition = "2021"
name = "tokio-graceful"
version = "0.2.0"
version = "0.2.1"
description = "util for graceful shutdown of tokio applications"
homepage = "https://github.com/plabayo/tokio-graceful"
readme = "README.md"
Expand Down
85 changes: 80 additions & 5 deletions src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,33 @@ pub struct ShutdownGuard(ManuallyDrop<WeakShutdownGuard>);
#[derive(Debug, Clone)]
pub struct WeakShutdownGuard {
pub(crate) trigger_rx: Receiver,
pub(crate) shutdown_signal_trigger_rx: Option<Receiver>,
pub(crate) zero_tx: Sender,
pub(crate) ref_count: Arc<AtomicUsize>,
}

impl ShutdownGuard {
pub(crate) fn new(trigger_rx: Receiver, zero_tx: Sender, ref_count: Arc<AtomicUsize>) -> Self {
pub(crate) fn new(
trigger_rx: Receiver,
shutdown_signal_trigger_rx: Option<Receiver>,
zero_tx: Sender,
ref_count: Arc<AtomicUsize>,
) -> Self {
let value = ref_count.fetch_add(1, Ordering::SeqCst);
tracing::trace!("new shutdown guard: ref_count+1: {}", value + 1);
Self(ManuallyDrop::new(WeakShutdownGuard::new(
trigger_rx, zero_tx, ref_count,
trigger_rx,
shutdown_signal_trigger_rx,
zero_tx,
ref_count,
)))
}

/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested.
/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested
/// and the delay (if any) duration has been awaited.
///
/// Use [`Self::shutdown_signal_triggered`] for tasks that do not
/// require this opt-in delay buffer duration.
///
/// The future will complete immediately if the token is already cancelled when this method is called.
///
Expand All @@ -56,6 +69,29 @@ impl ShutdownGuard {
self.0.cancelled().await
}

/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested.
///
/// Use [`Self::cancelled`] if you want to make sure the future
/// only completes when the buffer delay has been awaited.
///
/// In case no delay has been configured for the parent `Shutdown`,
/// this function will be equal in behaviour to [`Self::cancelled`].
///
/// The future will complete immediately if the token is already cancelled when this method is called.
///
/// # Cancel safety
///
/// This method is cancel safe.
///
/// # Panics
///
/// This method panics if the iternal mutex
/// is poisoned while being used.
#[inline]
pub async fn shutdown_signal_triggered(&self) {
self.0.shutdown_signal_triggered().await
}

/// Returns a [`crate::sync::JoinHandle`] that can be awaited on
/// to wait for the spawned task to complete. See
/// [`crate::sync::spawn`] for more information.
Expand Down Expand Up @@ -166,15 +202,26 @@ impl Drop for ShutdownGuard {
}

impl WeakShutdownGuard {
pub(crate) fn new(trigger_rx: Receiver, zero_tx: Sender, ref_count: Arc<AtomicUsize>) -> Self {
pub(crate) fn new(
trigger_rx: Receiver,
shutdown_signal_trigger_rx: Option<Receiver>,
zero_tx: Sender,
ref_count: Arc<AtomicUsize>,
) -> Self {
Self {
trigger_rx,
shutdown_signal_trigger_rx,
zero_tx,
ref_count,
}
}

/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested.
/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested
/// and the delay (buffer) duration has been awaited on.
///
/// Use [`Self::shutdown_signal_triggered`] in case you want to get
/// a future which is triggered immediately when the shutdown signal is received,
/// without waiting for the delay duration first.
///
/// The future will complete immediately if the token is already cancelled when this method is called.
///
Expand All @@ -191,6 +238,34 @@ impl WeakShutdownGuard {
self.trigger_rx.clone().await;
}

/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested
/// without awaiting the delay duration first, if one is set.
///
/// In case no delay has been configured for the parent `Shutdown`,
/// this function will be equal in behaviour to [`Self::cancelled`].
///
/// Use [`Self::cancelled`] in case you want to get
/// a future which is triggered when the shutdown signal is received
/// and thethe delay duration is awaited.
///
/// The future will complete immediately if the token is already cancelled when this method is called.
///
/// # Cancel safety
///
/// This method is cancel safe.
///
/// # Panics
///
/// This method panics if the iternal mutex
/// is poisoned while being used.
#[inline]
pub async fn shutdown_signal_triggered(&self) {
self.shutdown_signal_trigger_rx
.clone()
.unwrap_or_else(|| self.trigger_rx.clone())
.await
}

/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested.
///
/// In contrast to [`ShutdownGuard::cancelled`] this method consumes the guard,
Expand Down
24 changes: 24 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,30 @@ mod tests {
assert!(result.is_err(), "{result:?}");
}

#[tokio::test]
async fn test_shutdown_cancelled_vs_shutdown_signal_triggered() {
let (tx, rx) = oneshot::channel::<()>();
let shutdown = Shutdown::builder()
.with_delay(Duration::from_secs(5))
.with_signal(rx)
.build();
tx.send(()).unwrap();

let weak_guard = shutdown.guard_weak();

// will fail because delay is still being awaited
let result = tokio::time::timeout(Duration::from_micros(100), weak_guard.cancelled()).await;
assert!(result.is_err(), "{result:?}");

// this will succeed however, as it does not await the delay
let result = tokio::time::timeout(
Duration::from_millis(100),
weak_guard.shutdown_signal_triggered(),
)
.await;
assert!(result.is_ok(), "{result:?}");
}

#[tokio::test]
async fn test_shutdown_nested_guards() {
let (tx, rx) = oneshot::channel::<()>();
Expand Down
40 changes: 33 additions & 7 deletions src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl ShutdownBuilder<sealed::WithoutSignal> {
pub fn build(self) -> Shutdown {
let (zero_tx, zero_rx) = trigger();

let guard = ShutdownGuard::new(Receiver::closed(), zero_tx, Default::default());
let guard = ShutdownGuard::new(Receiver::closed(), None, zero_tx, Default::default());

Shutdown {
guard,
Expand All @@ -173,16 +173,29 @@ impl<I: sealed::IntoFuture> ShutdownBuilder<sealed::WithSignal<I>> {
/// all jobs are complete.
pub fn build(self) -> Shutdown {
let trigger_signal = self.data.signal.into_future();
let delay = self.data.delay;

let (delay_tuple, maybe_shutdown_signal_rx) = match self.data.delay {
Some(delay) => {
let (shutdown_signal_tx, shutdown_signal_rx) = trigger();
(Some((delay, shutdown_signal_tx)), Some(shutdown_signal_rx))
}
None => (None, None),
};

let (signal_tx, signal_rx) = trigger();
let (zero_tx, zero_rx) = trigger();

let guard = ShutdownGuard::new(signal_rx, zero_tx, Default::default());
let guard = ShutdownGuard::new(
signal_rx,
maybe_shutdown_signal_rx,
zero_tx,
Default::default(),
);

crate::sync::spawn(async move {
let _ = trigger_signal.await;
if let Some(delay) = delay {
if let Some((delay, shutdown_signal_tx)) = delay_tuple {
shutdown_signal_tx.trigger();
tracing::trace!(
"::trigger signal recieved: delay buffer activated: {:?}",
delay
Expand Down Expand Up @@ -213,13 +226,25 @@ where
pub fn build(self) -> Shutdown {
let trigger_signal = self.data.signal.into_future();
let overwrite_fn = self.data.overwrite_fn;
let delay = self.data.delay;

let (delay_tuple, maybe_shutdown_signal_rx) = match self.data.delay {
Some(delay) => {
let (shutdown_signal_tx, shutdown_signal_rx) = trigger();
(Some((delay, shutdown_signal_tx)), Some(shutdown_signal_rx))
}
None => (None, None),
};

let (signal_tx, signal_rx) = trigger();
let (zero_tx, zero_rx) = trigger();
let (zero_overwrite_tx, zero_overwrite_rx) = trigger();

let guard = ShutdownGuard::new(signal_rx, zero_tx, Default::default());
let guard = ShutdownGuard::new(
signal_rx,
maybe_shutdown_signal_rx,
zero_tx,
Default::default(),
);

crate::sync::spawn(async move {
let _ = trigger_signal.await;
Expand All @@ -228,7 +253,8 @@ where
let _ = overwrite_signal.await;
zero_overwrite_tx.trigger();
});
if let Some(delay) = delay {
if let Some((delay, shutdown_signal_tx)) = delay_tuple {
shutdown_signal_tx.trigger();
tracing::trace!(
"::trigger signal recieved: delay buffer activated: {:?}",
delay
Expand Down

0 comments on commit 3d5997c

Please sign in to comment.