Skip to content

Commit

Permalink
feat: send signals in parallel (#51)
Browse files Browse the repository at this point in the history
Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
  • Loading branch information
vstakhov and drahnr authored Jul 17, 2023
1 parent 096d3c5 commit 8b6e817
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
51 changes: 48 additions & 3 deletions orchestra/proc-macro/src/impl_orchestra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ pub(crate) fn impl_orchestra_struct(info: &OrchestraInfo) -> proc_macro2::TokenS

/// Capacity of a signal channel between a subsystem and the orchestra.
const SIGNAL_CHANNEL_CAPACITY: usize = #signal_channel_capacity;
/// Timeout to wait for a signal to be processed by the target subsystem. If this timeout is exceeded, the
/// orchestra terminates with an error.
const SIGNAL_TIMEOUT: ::std::time::Duration = ::std::time::Duration::from_secs(10);

/// The log target tag.
const LOG_TARGET: &str = #log_target;
Expand Down Expand Up @@ -129,12 +132,45 @@ pub(crate) fn impl_orchestra_struct(info: &OrchestraInfo) -> proc_macro2::TokenS

/// Broadcast a signal to all subsystems.
pub async fn broadcast_signal(&mut self, signal: #signal_ty) -> ::std::result::Result<(), #error_ty > {
let mut delayed_signals : #support_crate ::futures::stream::FuturesUnordered<::std::pin::Pin<::std::boxed::Box<dyn #support_crate::futures::Future<Output = ::std::result::Result<(), #error_ty>>>>>
= #support_crate ::futures::stream::FuturesUnordered::new();
#(
// Use fast path if possible.
#feature_gates
self. #subsystem_name .send_signal(signal.clone()).await?;
if let Err(e) = self. #subsystem_name .try_send_signal(signal.clone()) {
match e {
#support_crate::TrySendError::Full(sig) => {
let instance = self. #subsystem_name .instance.as_mut().expect("checked in try_send_signal");
delayed_signals.push(::std::boxed::Box::pin(async move {
match instance.tx_signal.send(sig).timeout(SIGNAL_TIMEOUT).await {
None => {
Err(#error_ty :: from(
#support_crate ::OrchestraError::SubsystemStalled(instance.name, "signal", ::std::any::type_name::<#signal_ty>())
))
}
Some(res) => {
let res = res.map_err(|_| #error_ty :: from(
#support_crate ::OrchestraError::QueueError
));
if res.is_ok() {
instance.signals_received += 1;
}
res
}
}
}));
},
_ => return Err(#error_ty :: from(#support_crate ::OrchestraError::QueueError))
}
}
)*
let _ = signal;

// If fast path failed, wait for all delayed signals with no specific order
while let Some(res) = delayed_signals.next().await {
res?;
}

Ok(())
}

Expand Down Expand Up @@ -255,12 +291,21 @@ pub(crate) fn impl_orchestrated_subsystem(info: &OrchestraInfo) -> proc_macro2::
}
}

/// Tries to send a signal to the wrapped subsystem without waiting.
pub fn try_send_signal(&mut self, signal: #signal) -> ::std::result::Result<(), #support_crate :: TrySendError<#signal> > {
if let Some(ref mut instance) = self.instance {
instance.tx_signal.try_send(signal)?;
instance.signals_received += 1;
Ok(())
} else {
Ok(())
}
}

/// Send a signal to the wrapped subsystem.
///
/// If the inner `instance` is `None`, nothing is happening.
pub async fn send_signal(&mut self, signal: #signal) -> ::std::result::Result<(), #error_ty > {
const SIGNAL_TIMEOUT: ::std::time::Duration = ::std::time::Duration::from_secs(10);

if let Some(ref mut instance) = self.instance {
match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await {
None => {
Expand Down
3 changes: 3 additions & 0 deletions orchestra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ use std::sync::{
#[doc(hidden)]
pub use std::time::Duration;

#[doc(hidden)]
pub use metered::TrySendError;

#[doc(hidden)]
pub use futures_timer::Delay;

Expand Down

0 comments on commit 8b6e817

Please sign in to comment.