Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send signals in parallel #51

Merged
merged 3 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions orchestra/proc-macro/src/impl_orchestra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ pub(crate) fn impl_orchestra_struct(info: &OrchestraInfo) -> proc_macro2::TokenS

/// Capacity of a signal channel between a subsystem and the orchestra.
const SIGNAL_CHANNEL_CAPACITY: usize = #signal_channel_capacity;
/// Timeout to wait for a signal to be processed by the target subsystem. If this timeout is exceeded, the
/// orchestra terminates with an error.
const SIGNAL_TIMEOUT: ::std::time::Duration = ::std::time::Duration::from_secs(10);
vstakhov marked this conversation as resolved.
Show resolved Hide resolved

/// The log target tag.
const LOG_TARGET: &str = #log_target;
Expand Down Expand Up @@ -129,12 +132,45 @@ pub(crate) fn impl_orchestra_struct(info: &OrchestraInfo) -> proc_macro2::TokenS

/// Broadcast a signal to all subsystems.
pub async fn broadcast_signal(&mut self, signal: #signal_ty) -> ::std::result::Result<(), #error_ty > {
let mut delayed_signals : #support_crate ::futures::stream::FuturesUnordered<::std::pin::Pin<::std::boxed::Box<dyn #support_crate::futures::Future<Output = ::std::result::Result<(), #error_ty>>>>>
= #support_crate ::futures::stream::FuturesUnordered::new();
#(
// Use fast path if possible.
#feature_gates
self. #subsystem_name .send_signal(signal.clone()).await?;
if let Err(e) = self. #subsystem_name .try_send_signal(signal.clone()) {
match e {
#support_crate::TrySendError::Full(sig) => {
let instance = self. #subsystem_name .instance.as_mut().expect("checked in try_send_signal");
delayed_signals.push(::std::boxed::Box::pin(async move {
match instance.tx_signal.send(sig).timeout(SIGNAL_TIMEOUT).await {
None => {
Err(#error_ty :: from(
#support_crate ::OrchestraError::SubsystemStalled(instance.name, "signal", ::std::any::type_name::<#signal_ty>())
))
}
Some(res) => {
let res = res.map_err(|_| #error_ty :: from(
#support_crate ::OrchestraError::QueueError
));
if res.is_ok() {
instance.signals_received += 1;
}
res
}
}
}));
},
_ => return Err(#error_ty :: from(#support_crate ::OrchestraError::QueueError))
}
}
)*
let _ = signal;

// If fast path failed, wait for all delayed signals with no specific order
while let Some(res) = delayed_signals.next().await {
res?;
}

Ok(())
}

Expand Down Expand Up @@ -255,12 +291,21 @@ pub(crate) fn impl_orchestrated_subsystem(info: &OrchestraInfo) -> proc_macro2::
}
}

/// Tries to send a signal to the wrapped subsystem without waiting.
pub fn try_send_signal(&mut self, signal: #signal) -> ::std::result::Result<(), #support_crate :: TrySendError<#signal> > {
if let Some(ref mut instance) = self.instance {
instance.tx_signal.try_send(signal)?;
instance.signals_received += 1;
Ok(())
} else {
Ok(())
}
}

/// Send a signal to the wrapped subsystem.
///
/// If the inner `instance` is `None`, nothing is happening.
pub async fn send_signal(&mut self, signal: #signal) -> ::std::result::Result<(), #error_ty > {
const SIGNAL_TIMEOUT: ::std::time::Duration = ::std::time::Duration::from_secs(10);

if let Some(ref mut instance) = self.instance {
match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await {
None => {
Expand Down
3 changes: 3 additions & 0 deletions orchestra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ use std::sync::{
#[doc(hidden)]
pub use std::time::Duration;

#[doc(hidden)]
pub use metered::TrySendError;

#[doc(hidden)]
pub use futures_timer::Delay;

Expand Down