Skip to content

Commit

Permalink
Allow subsystems using priority channels
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreiEres committed May 30, 2024
1 parent 2b9a313 commit dd5b7e2
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 5 deletions.
3 changes: 2 additions & 1 deletion orchestra/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,10 @@ pub(crate) fn impl_feature_gated_items(
#(
let (#channel_name_tx, #channel_name_rx)
=
#support_crate ::metered::channel::<
#support_crate ::metered::channel_with_priority::<
MessagePacket< #maybe_boxed_consumes >
>(
self.channel_capacity.unwrap_or(#message_channel_capacity),
self.channel_capacity.unwrap_or(#message_channel_capacity)
);
)*
Expand Down
13 changes: 10 additions & 3 deletions orchestra/proc-macro/src/impl_channels_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,21 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr
&mut self,
signals_received: usize,
message: #message_wrapper,
use_priority_channel: bool,
) {
let res: ::std::result::Result<_, _> = match message {
#(
#feature_gates
#message_wrapper :: #consumes_variant ( inner ) => {
self. #channel_name .send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await.map_err(|_| stringify!( #channel_name ))
if use_priority_channel {
self. #channel_name .priority_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await.map_err(|_| stringify!( #channel_name ))
} else {
self. #channel_name .send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await.map_err(|_| stringify!( #channel_name ))
}
}
)*
// subsystems that are wip
Expand Down
14 changes: 13 additions & 1 deletion orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,19 @@ pub(crate) fn impl_subsystem_sender(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
)
),
false,
).await;
}

async fn priority_send_message(&mut self, msg: OutgoingMessage)
{
self.channels.send_and_log_error(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
),
true
).await;
}

Expand Down
3 changes: 3 additions & 0 deletions orchestra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,9 @@ where
/// Send a direct message to some other `Subsystem`, routed based on message type.
async fn send_message(&mut self, msg: OutgoingMessage);

/// Send a direct priority message to some other `Subsystem`, routed based on message type.
async fn priority_send_message(&mut self, msg: OutgoingMessage);

/// Tries to send a direct message to some other `Subsystem`, routed based on message type.
/// This method is useful for cases where the message queue is bounded and the message is ok
/// to be dropped if the queue is full. If the queue is full, this method will return an error.
Expand Down
161 changes: 161 additions & 0 deletions orchestra/tests/subsystems_priority_channels_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::executor::ThreadPool;
use orchestra::*;
use std::sync::{
atomic::{AtomicU8, Ordering},
Arc,
};

struct SubA {
regular: Vec<u8>,
priority: Vec<u8>,
}

pub struct SubB {
messages: Vec<Arc<AtomicU8>>,
}

impl SubA {
fn new(regular: Vec<u8>, priority: Vec<u8>) -> Self {
Self { regular, priority }
}
}

impl SubB {
fn new(messages: Vec<Arc<AtomicU8>>) -> Self {
Self { messages }
}
}

impl crate::Subsystem<OrchestraSubsystemContext<MsgA>, OrchestraError> for SubA {
fn start(self, mut ctx: OrchestraSubsystemContext<MsgA>) -> SpawnedSubsystem<OrchestraError> {
let mut sender = ctx.sender().clone();
SpawnedSubsystem {
name: "sub A",
future: Box::pin(async move {
for i in self.regular {
sender.send_message(MsgB(i)).await;
}
for i in self.priority {
sender.priority_send_message(MsgB(i)).await;
}

Ok(())
}),
}
}
}

impl crate::Subsystem<OrchestraSubsystemContext<MsgB>, OrchestraError> for SubB {
fn start(self, mut ctx: OrchestraSubsystemContext<MsgB>) -> SpawnedSubsystem<OrchestraError> {
SpawnedSubsystem {
name: "sub B",
future: Box::pin(async move {
// Wait until sub_a sends all messages
futures_timer::Delay::new(Duration::from_millis(50)).await;
for i in self.messages {
match ctx.recv().await.unwrap() {
FromOrchestra::Communication { msg } => {
i.store(msg.0, Ordering::SeqCst);
},
_ => panic!("unexpected message"),
}
}

Ok(())
}),
}
}
}

#[derive(Clone, Debug)]
pub struct SigSigSig;

#[derive(Clone, Debug)]
pub struct Event;

#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct MsgA(u8);

#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct MsgB(u8);

#[derive(Debug, Clone)]
pub struct DummySpawner(pub ThreadPool);

impl Spawner for DummySpawner {
fn spawn_blocking(
&self,
_task_name: &'static str,
_subsystem_name: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}

fn spawn(
&self,
_task_name: &'static str,
_subsystem_name: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}
}

#[orchestra(signal=SigSigSig, event=Event, gen=AllMessages, error=OrchestraError, boxed_messages=true)]
pub struct Orchestra {
#[subsystem(consumes: MsgA, sends: [MsgB])]
sub_a: SubA,

#[subsystem(consumes: MsgB, sends: [MsgA])]
sub_b: SubB,
}

#[test]
fn test_priority_send_message() {
let regular = vec![1, 2, 3];
let priority = vec![42];
let messages = vec![
Arc::new(AtomicU8::new(0)),
Arc::new(AtomicU8::new(0)),
Arc::new(AtomicU8::new(0)),
Arc::new(AtomicU8::new(0)),
];
let sub_a = SubA::new(regular.clone(), priority.clone());
let sub_b = SubB::new(messages.clone());
let pool = ThreadPool::new().unwrap();
let (orchestra, _handle) = Orchestra::builder()
.sub_a(sub_a)
.sub_b(sub_b)
.spawner(DummySpawner(pool))
.build()
.unwrap();

futures::executor::block_on(async move {
for run_subsystem in orchestra.running_subsystems {
run_subsystem.await.unwrap();
}
});

assert_eq!(
priority.into_iter().chain(regular.into_iter()).collect::<Vec<u8>>(),
messages.iter().map(|i| i.load(Ordering::SeqCst)).collect::<Vec<u8>>()
);
}

0 comments on commit dd5b7e2

Please sign in to comment.