diff --git a/node/core/chain-selection/src/lib.rs b/node/core/chain-selection/src/lib.rs index 172de99d34ee..780cbe9db8c3 100644 --- a/node/core/chain-selection/src/lib.rs +++ b/node/core/chain-selection/src/lib.rs @@ -352,7 +352,7 @@ async fn run( match res { Err(e) => { e.trace(); - // All errors right now are considered fatal: + // All errors are considered fatal right now: break }, Ok(()) => { diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 20141fad8555..c02bcccb0a06 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -31,7 +31,7 @@ use polkadot_overseer::{ self as overseer, dummy::dummy_overseer_builder, gen::{FromOverseer, SpawnedSubsystem}, - AllMessages, HeadSupportsParachains, OverseerSignal, SubsystemError, + HeadSupportsParachains, OverseerSignal, SubsystemError, }; use polkadot_primitives::v2::Hash; @@ -49,11 +49,7 @@ struct Subsystem1; impl Subsystem1 { async fn run(mut ctx: Ctx) -> () where - Ctx: overseer::SubsystemContext< - Message = CandidateBackingMessage, - AllMessages = AllMessages, - Signal = OverseerSignal, - >, + Ctx: overseer::SubsystemContext, { 'louy: loop { match ctx.try_recv().await { @@ -79,8 +75,7 @@ impl Subsystem1 { Default::default(), tx, ); - ctx.send_message(::AllMessages::from(msg)) - .await; + ctx.send_message(msg).await; } () } @@ -88,11 +83,7 @@ impl Subsystem1 { impl overseer::Subsystem for Subsystem1 where - Context: overseer::SubsystemContext< - Message = CandidateBackingMessage, - AllMessages = AllMessages, - Signal = OverseerSignal, - >, + Context: overseer::SubsystemContext, { fn start(self, ctx: Context) -> SpawnedSubsystem { let future = Box::pin(async move { @@ -113,7 +104,6 @@ impl Subsystem2 { where Ctx: overseer::SubsystemContext< Message = CandidateValidationMessage, - AllMessages = AllMessages, Signal = OverseerSignal, >, { @@ -148,11 +138,8 @@ impl Subsystem2 { impl overseer::Subsystem for Subsystem2 where - Context: overseer::SubsystemContext< - Message = CandidateValidationMessage, - AllMessages = AllMessages, - Signal = OverseerSignal, - >, + Context: + overseer::SubsystemContext, { fn start(self, ctx: Context) -> SpawnedSubsystem { let future = Box::pin(async move { diff --git a/node/overseer/overseer-gen/README.md b/node/overseer/overseer-gen/README.md index cd1098a7a16c..ce03897f804c 100644 --- a/node/overseer/overseer-gen/README.md +++ b/node/overseer/overseer-gen/README.md @@ -63,10 +63,10 @@ is not ready to be included in the Overseer: ```rust #[overlord(signal=SigSigSig, event=Event, gen=AllMessages, error=OverseerError)] pub struct Overseer { - #[subsystem(MsgA)] + #[subsystem(MsgA, sends: MsgB)] sub_a: AwesomeSubSysA, - #[subsystem(MsgB), wip] + #[subsystem(MsgB, sends: MsgA), wip] sub_b: AwesomeSubSysB, // This subsystem will not be required nor allowed to be set } ``` diff --git a/node/overseer/overseer-gen/examples/dummy.rs b/node/overseer/overseer-gen/examples/dummy.rs index 8532539bc98d..31b1c4caccf8 100644 --- a/node/overseer/overseer-gen/examples/dummy.rs +++ b/node/overseer/overseer-gen/examples/dummy.rs @@ -1,19 +1,23 @@ //! A dummy to be used with cargo expand use polkadot_node_network_protocol::WrongVariant; -use polkadot_overseer_gen::*; +use polkadot_overseer_gen::{SpawnNamed, *}; use std::collections::HashMap; /// Concrete subsystem implementation for `MsgStrukt` msg type. #[derive(Default)] pub struct AwesomeSubSys; -impl ::polkadot_overseer_gen::Subsystem for AwesomeSubSys { - fn start(self, mut ctx: AwesomeSubSysContext) -> SpawnedSubsystem { +impl ::polkadot_overseer_gen::Subsystem, Yikes> for AwesomeSubSys { + fn start(self, mut ctx: XxxSubsystemContext) -> SpawnedSubsystem { let mut sender = ctx.sender().clone(); - - ctx.spawn("awesome", Box::pin(async move { sender.send_message(Plinko).await })) - .unwrap(); + ctx.spawn( + "AwesomeSubsys", + Box::pin(async move { + sender.send_message(Plinko).await; + }), + ) + .unwrap(); unimplemented!("starting yay!") } } @@ -21,11 +25,16 @@ impl ::polkadot_overseer_gen::Subsystem for Awesome #[derive(Default)] pub struct GoblinTower; -impl ::polkadot_overseer_gen::Subsystem for GoblinTower { - fn start(self, mut ctx: GoblinTowerContext) -> SpawnedSubsystem { +impl ::polkadot_overseer_gen::Subsystem, Yikes> for GoblinTower { + fn start(self, mut ctx: XxxSubsystemContext) -> SpawnedSubsystem { let mut sender = ctx.sender().clone(); - ctx.spawn("awesome", Box::pin(async move { sender.send_message(MsgStrukt(0u8)).await })) - .unwrap(); + ctx.spawn( + "GoblinTower", + Box::pin(async move { + sender.send_message(MsgStrukt(8u8)).await; + }), + ) + .unwrap(); unimplemented!("welcum") } } @@ -96,11 +105,11 @@ impl NetworkMsg { } #[overlord(signal=SigSigSig, event=EvX, error=Yikes, network=NetworkMsg, gen=AllMessages)] -struct Yyy { - #[subsystem(consumes: MsgStrukt, sends: Plinko)] +struct Xxx { + #[subsystem(consumes: MsgStrukt, sends: [Plinko])] sub0: AwesomeSubSys, - #[subsystem(no_dispatch, blocking, consumes: Plinko, sends: MsgStrukt)] + #[subsystem(no_dispatch, blocking, consumes: Plinko, sends: [MsgStrukt])] plinkos: GoblinTower, i_like_pi: f64, @@ -135,7 +144,7 @@ impl SpawnNamed for DummySpawner { struct DummyCtx; fn main() { - let (overseer, _handle): (Yyy<_, f64>, _) = Yyy::builder() + let (overseer, _handle): (Xxx<_, f64>, _) = Xxx::builder() .sub0(AwesomeSubSys::default()) .plinkos(GoblinTower::default()) .i_like_pi(::std::f64::consts::PI) diff --git a/node/overseer/overseer-gen/proc-macro/Cargo.toml b/node/overseer/overseer-gen/proc-macro/Cargo.toml index 8829b9bd33b6..274a2a241d66 100644 --- a/node/overseer/overseer-gen/proc-macro/Cargo.toml +++ b/node/overseer/overseer-gen/proc-macro/Cargo.toml @@ -23,7 +23,7 @@ petgraph = "0.6.0" assert_matches = "1.5.0" [features] -default = ["expand"] +default = ["graph", "expand"] # write the expanded version to a `overlord-expansion.[a-f0-9]{10}.rs` # in the `OUT_DIR` as defined by `cargo` for the `expander` crate. expand = [] diff --git a/node/overseer/overseer-gen/proc-macro/src/impl_builder.rs b/node/overseer/overseer-gen/proc-macro/src/impl_builder.rs index dd6eab15336f..17a6ee7dcba8 100644 --- a/node/overseer/overseer-gen/proc-macro/src/impl_builder.rs +++ b/node/overseer/overseer-gen/proc-macro/src/impl_builder.rs @@ -35,11 +35,10 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { let builder = format_ident!("{}Builder", overseer_name); let handle = format_ident!("{}Handle", overseer_name); let connector = format_ident!("{}Connector", overseer_name); + let subsystem_ctx_name = format_ident!("{}SubsystemContext", overseer_name); let subsystem_name = &info.subsystem_names_without_wip(); let subsystem_generics = &info.subsystem_generic_types(); - let subsystem_ctx_names = - &Vec::from_iter(subsystem_generics.iter().map(|name| format_ident!("{}Context", name))); let consumes = &info.consumes_without_wip(); let channel_name = &info.channel_names_without_wip(""); @@ -104,8 +103,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { info.subsystems().iter().filter(|ssf| !ssf.wip).enumerate().map(|(idx, ssf)| { let field_name = &ssf.name; let field_type = &ssf.generic; - let subsystem_ctx_name = format_ident!("{}Context", &field_type); - + let subsystem_consumes = &ssf.message_to_consume; // Remove state generic for the item to be replaced. It sufficient to know `field_type` for // that since we always move from `Init<#field_type>` to `Init`. let impl_subsystem_state_generics = recollect_without_idx(&subsystem_passthrough_state_generics[..], idx); @@ -137,7 +135,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { impl #builder where - #field_type : Subsystem<#subsystem_ctx_name, #error_ty>, + #field_type : Subsystem<#subsystem_ctx_name<#subsystem_consumes>, #error_ty>, { /// Specify the subsystem in the builder directly pub fn #field_name (self, var: #field_type ) -> @@ -154,7 +152,6 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { spawner: self.spawner, } } - /// Specify the the initialization function for a subsystem pub fn #field_name_with<'a, F>(self, subsystem_init_fn: F ) -> #builder @@ -181,7 +178,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { impl #builder where - #field_type : Subsystem<#subsystem_ctx_name, #error_ty>, + #field_type : Subsystem<#subsystem_ctx_name<#subsystem_consumes>, #error_ty>, { /// Replace a subsystem by another implementation for the /// consumable message type. @@ -190,7 +187,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { where #field_type: 'static, F: 'static + FnOnce(#field_type) -> NEW, - NEW: #support_crate ::Subsystem<#subsystem_ctx_name, #error_ty>, + NEW: #support_crate ::Subsystem<#subsystem_ctx_name< #subsystem_consumes >, #error_ty>, { let replacement: Init = match self.#field_name { Init::Fn(fx) => @@ -325,7 +322,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { #builder #(, Missing<#field_type> )* > where #( - #subsystem_generics : Subsystem<#subsystem_ctx_names, #error_ty>, + #subsystem_generics : Subsystem<#subsystem_ctx_name< #consumes >, #error_ty>, )* { #builder :: new() @@ -443,7 +440,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { where #spawner_where_clause, #( - #subsystem_generics : Subsystem<#subsystem_ctx_names, #error_ty>, + #subsystem_generics : Subsystem<#subsystem_ctx_name< #consumes >, #error_ty>, )* { /// Complete the construction and create the overseer type. @@ -520,7 +517,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { // Convert owned `snake case` string to a `kebab case` static str. let subsystem_static_str = Box::leak(subsystem_string.replace("_", "-").into_boxed_str()); - let ctx = #subsystem_ctx_names::new( + let ctx = #subsystem_ctx_name::< #consumes >::new( signal_rx, message_rx, channels_out.clone(), diff --git a/node/overseer/overseer-gen/proc-macro/src/impl_message_wrapper.rs b/node/overseer/overseer-gen/proc-macro/src/impl_message_wrapper.rs index 0be039b78671..7d29ddd458ab 100644 --- a/node/overseer/overseer-gen/proc-macro/src/impl_message_wrapper.rs +++ b/node/overseer/overseer-gen/proc-macro/src/impl_message_wrapper.rs @@ -52,7 +52,8 @@ pub(crate) fn impl_message_wrapper_enum(info: &OverseerInfo) -> Result Result { #( #outgoing_wrapper :: #outgoing_variant ( msg ) => #all_messages_wrapper :: #subsystem_generic ( msg ), )* + #outgoing_wrapper :: Empty => #all_messages_wrapper :: Empty, } } } @@ -125,11 +126,9 @@ pub(crate) fn impl_subsystem(info: &OverseerInfo) -> Result { } // Dump the graph to file. - if cfg!(feature = "graph") { - let path = std::path::PathBuf::from( - std::env::var("OUT_DIR").expect("build.rs exists, hence OUT_DIR is populated. qed"), - ) - .join(overseer_name.to_string().to_lowercase() + "-subsystem-messaging.dot"); + if cfg!(feature = "graph") || true { + let path = std::path::PathBuf::from(env!("OUT_DIR")) + .join(overseer_name.to_string().to_lowercase() + "-subsystem-messaging.dot"); if let Err(e) = std::fs::OpenOptions::new() .truncate(true) .create(true) @@ -138,36 +137,36 @@ pub(crate) fn impl_subsystem(info: &OverseerInfo) -> Result { .and_then(|mut f| graphviz(&graph, &mut f)) { eprintln!("Failed to write dot graph to {}: {:?}", path.display(), e); + } else { + println!("Wrote dot graph to {}", path.display()); } } + let subsystem_sender_name = Ident::new(&(overseer_name.to_string() + "Sender"), span); + let subsystem_ctx_name = Ident::new(&(overseer_name.to_string() + "SubsystemContext"), span); + ts.extend(impl_subsystem_context(info, &subsystem_sender_name, &subsystem_ctx_name)); + + ts.extend(impl_associate_outgoing_messages_trait(&all_messages_wrapper)); + + ts.extend(impl_subsystem_sender( + support_crate, + info.subsystems().iter().map(|ssf| { + let outgoing_wrapper = + Ident::new(&(ssf.generic.to_string() + "OutgoingMessages"), span); + outgoing_wrapper + }), + &all_messages_wrapper, + &subsystem_sender_name, + )); + // Create all subsystem specific types, one by one for ssf in info.subsystems() { let subsystem_name = ssf.generic.to_string(); - let subsystem_sender_name = Ident::new(&(subsystem_name.clone() + "Sender"), span); - let subsystem_ctx_name = Ident::new(&(subsystem_name.clone() + "Context"), span); - let outgoing_wrapper = Ident::new(&(subsystem_name.clone() + "OutgoingMessages"), span); - let consumes = ssf.message_to_consume.get_ident().cloned().expect( - "Consumable message must have an trailing Ident which was checked in `Parse`. qed", - ); - ts.extend(impl_wrapper_enum(&outgoing_wrapper, ssf.messages_to_send.as_slice())?); + ts.extend(impl_associate_outgoing_messages(&ssf.message_to_consume, &outgoing_wrapper)); - ts.extend(impl_subsystem_sender( - &all_messages_wrapper, - support_crate, - &outgoing_wrapper, - &all_messages_wrapper, - &subsystem_sender_name, - )); - ts.extend(impl_subsystem_context( - info, - &consumes, - &outgoing_wrapper, - &subsystem_sender_name, - &subsystem_ctx_name, - )); + ts.extend(impl_wrapper_enum(&outgoing_wrapper, ssf.messages_to_send.as_slice())?); } Ok(ts) } @@ -196,11 +195,12 @@ pub(crate) fn impl_wrapper_enum(wrapper: &Ident, message_types: &[Path]) -> Resu let ts = quote! { #[allow(missing_docs)] - #[derive(Clone)] + #[derive(Debug)] pub enum #wrapper { #( #variants ( #message_types ), )* + Empty, } #( @@ -210,14 +210,20 @@ pub(crate) fn impl_wrapper_enum(wrapper: &Ident, message_types: &[Path]) -> Resu } } )* + + // Useful for unit and integration tests: + impl ::std::convert::From< () > for #wrapper { + fn from(_message: ()) -> Self { + #wrapper :: Empty + } + } }; Ok(ts) } pub(crate) fn impl_subsystem_sender( - wrapper_message: &Ident, support_crate: &TokenStream, - outgoing_wrapper: &Ident, + outgoing_wrappers: impl IntoIterator, all_messages_wrapper: &Ident, subsystem_sender_name: &Ident, ) -> TokenStream { @@ -225,11 +231,13 @@ pub(crate) fn impl_subsystem_sender( /// Connector to send messages towards all subsystems, /// while tracking the which signals where already received. #[derive(Debug, Clone)] - pub struct #subsystem_sender_name { + pub struct #subsystem_sender_name < OutgoingWrapper > { /// Collection of channels to all subsystems. channels: ChannelsOut, /// Systemwide tick for which signals were received by all subsystems. signals_received: SignalsReceived, + /// Keep that marker around. + _phantom: ::core::marker::PhantomData< OutgoingWrapper >, } }; @@ -239,15 +247,15 @@ pub(crate) fn impl_subsystem_sender( // 2. overseer-global-`AllMessages`-type let wrapped = |outgoing_wrapper: &Ident| { quote! { - /// implementation for wrapping message type... + /// `M` references the type _consumed_ by the subsystem. #[#support_crate ::async_trait] - impl SubsystemSender< Message > for #subsystem_sender_name + impl SubsystemSender< OutgoingMessage > for #subsystem_sender_name < #outgoing_wrapper > where - Message: Send + 'static, - #outgoing_wrapper: ::std::convert::From + Send, - #all_messages_wrapper: ::std::convert::From<#outgoing_wrapper> + Send, + OutgoingMessage: ::std::fmt::Debug + Send + 'static, + #outgoing_wrapper: ::std::convert::From + Send, + #all_messages_wrapper: ::std::convert::From< #outgoing_wrapper > + Send, { - async fn send_message(&mut self, msg: Message) { + async fn send_message(&mut self, msg: OutgoingMessage) { self.channels.send_and_log_error( self.signals_received.load(), #all_messages_wrapper ::from ( @@ -258,15 +266,15 @@ pub(crate) fn impl_subsystem_sender( async fn send_messages(&mut self, msgs: I) where - I: IntoIterator + Send, - I::IntoIter: Iterator + Send, + I: IntoIterator + Send, + I::IntoIter: Iterator + Send, { for msg in msgs { self.send_message( msg ).await; } } - fn send_unbounded_message(&mut self, msg: Message) { + fn send_unbounded_message(&mut self, msg: OutgoingMessage) { self.channels.send_unbounded_and_log_error( self.signals_received.load(), #all_messages_wrapper ::from ( @@ -278,27 +286,55 @@ pub(crate) fn impl_subsystem_sender( } }; - // Allow the `#wrapper_message` to be sent as well. - ts.extend(wrapped(&outgoing_wrapper)); - - // TODO FIXME - let inconsequent = false; - if inconsequent { - ts.extend(wrapped(wrapper_message)); + for outgoing_wrapper in outgoing_wrappers { + ts.extend(wrapped(&outgoing_wrapper)); } ts } +pub(crate) fn impl_associate_outgoing_messages_trait(all_messages_wrapper: &Ident) -> TokenStream { + quote! { + /// Binds a generated type covering all declared outgoing messages, + /// which implements `#generated_outgoing: From` for all annotated types + /// of a particular subsystem. + /// + /// Note: This works because there is a 1?:1 relation between consumed messages and subsystems. + pub trait AssociateOutgoing: ::std::fmt::Debug + Send { + /// The associated _outgoing_ messages for a subsystem that _consumes_ the message `Self`. + type OutgoingMessages: Into< #all_messages_wrapper > + ::std::fmt::Debug + Send; + } + + impl AssociateOutgoing for () { + type OutgoingMessages = (); + } + } +} + +/// A workaround until default associated types actually work. +pub(crate) fn impl_associate_outgoing_messages( + message: &Path, + outgoing_wrapper: &Ident, +) -> TokenStream { + quote! { + impl AssociateOutgoing for #outgoing_wrapper { + type OutgoingMessages = #outgoing_wrapper; + } + + impl AssociateOutgoing for #message { + type OutgoingMessages = #outgoing_wrapper; + } + } +} + /// Implement a builder pattern for the `Overseer`-type, /// which acts as the gateway to constructing the overseer. pub(crate) fn impl_subsystem_context( info: &OverseerInfo, - consumes: &Ident, - sends: &Ident, subsystem_sender_name: &Ident, subsystem_ctx_name: &Ident, ) -> TokenStream { + let all_messages_wrapper = &info.message_wrapper; let signal = &info.extern_signal_ty; let error_ty = &info.extern_error_ty; let support_crate = info.support_crate_name(); @@ -313,34 +349,38 @@ pub(crate) fn impl_subsystem_context( /// [`SubsystemJob`]: trait.SubsystemJob.html #[derive(Debug)] #[allow(missing_docs)] - pub struct #subsystem_ctx_name { + pub struct #subsystem_ctx_name { signals: #support_crate ::metered::MeteredReceiver< #signal >, - messages: SubsystemIncomingMessages< #consumes >, - to_subsystems: #subsystem_sender_name, + messages: SubsystemIncomingMessages< M >, + to_subsystems: #subsystem_sender_name < ::OutgoingMessages >, to_overseer: #support_crate ::metered::UnboundedMeteredSender< #support_crate ::ToOverseer >, signals_received: SignalsReceived, - pending_incoming: Option<(usize, #consumes)>, + pending_incoming: Option<(usize, M)>, name: &'static str } - impl #subsystem_ctx_name { + impl #subsystem_ctx_name + where + M: AssociateOutgoing + Send + 'static, + { /// Create a new context. fn new( signals: #support_crate ::metered::MeteredReceiver< #signal >, - messages: SubsystemIncomingMessages< #consumes >, + messages: SubsystemIncomingMessages< M >, to_subsystems: ChannelsOut, to_overseer: #support_crate ::metered::UnboundedMeteredSender<#support_crate:: ToOverseer>, name: &'static str ) -> Self { let signals_received = SignalsReceived::default(); - #subsystem_ctx_name { + #subsystem_ctx_name :: { signals, messages, - to_subsystems: #subsystem_sender_name { + to_subsystems: #subsystem_sender_name :: < ::OutgoingMessages > { channels: to_subsystems, signals_received: signals_received.clone(), + _phantom: ::core::marker::PhantomData::default(), }, to_overseer, signals_received, @@ -355,24 +395,32 @@ pub(crate) fn impl_subsystem_context( } #[#support_crate ::async_trait] - impl #support_crate ::SubsystemContext for #subsystem_ctx_name + impl #support_crate ::SubsystemContext for #subsystem_ctx_name where - #subsystem_sender_name: #support_crate ::SubsystemSender< #sends >, + M: AssociateOutgoing, + #all_messages_wrapper: From, + #subsystem_sender_name< + ::OutgoingMessages + >: + #support_crate ::SubsystemSender< + ::OutgoingMessages + >, { - type Message = #consumes; + type Message = M; type Signal = #signal; - type Sender = #subsystem_sender_name; - type OutgoingMessages = #sends; + type OutgoingMessages = ::OutgoingMessages; + // type AllMessages = #all_messages_wrapper; + type Sender = #subsystem_sender_name < ::OutgoingMessages >; type Error = #error_ty; - async fn try_recv(&mut self) -> ::std::result::Result>, ()> { + async fn try_recv(&mut self) -> ::std::result::Result>, ()> { match #support_crate ::poll!(self.recv()) { #support_crate ::Poll::Ready(msg) => Ok(Some(msg.map_err(|_| ())?)), #support_crate ::Poll::Pending => Ok(None), } } - async fn recv(&mut self) -> ::std::result::Result, #error_ty> { + async fn recv(&mut self) -> ::std::result::Result, #error_ty> { loop { // If we have a message pending an overseer signal, we only poll for signals // in the meantime. diff --git a/node/overseer/overseer-gen/src/lib.rs b/node/overseer/overseer-gen/src/lib.rs index f9ef90577326..9af1b454a982 100644 --- a/node/overseer/overseer-gen/src/lib.rs +++ b/node/overseer/overseer-gen/src/lib.rs @@ -351,14 +351,6 @@ impl From for FromOverseer { } } -/// Binds a generated type which implements `#generated_outgoing: From` -/// for all annotated types. -/// -/// ``` -/// -/// ``` -pub trait AssociateOutgoing {} - /// A context type that is given to the [`Subsystem`] upon spawning. /// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s /// or spawn jobs. @@ -374,7 +366,11 @@ pub trait SubsystemContext: Send + 'static { /// And the same for signals. type Signal: std::fmt::Debug + Send + 'static; /// The overarching messages `enum` for this particular subsystem. - type OutgoingMessages: Send + 'static; + type OutgoingMessages: std::fmt::Debug + Send + 'static; + + // The overarching messages `enum` for this particular subsystem. + // type AllMessages: From + From + std::fmt::Debug + Send + 'static; + /// The sender type as provided by `sender()` and underlying. type Sender: SubsystemSender + Send + 'static; /// The error type. @@ -404,10 +400,10 @@ pub trait SubsystemContext: Send + 'static { ) -> Result<(), Self::Error>; /// Send a direct message to some other `Subsystem`, routed based on message type. - async fn send_message(&mut self, msg: X) + async fn send_message(&mut self, msg: T) where - Self::OutgoingMessages: From, - X: Send, + Self::OutgoingMessages: From, + T: Send, { self.sender().send_message(::from(msg)).await } @@ -457,14 +453,17 @@ where /// Sender end of a channel to interface with a subsystem. #[async_trait::async_trait] -pub trait SubsystemSender: Send + Clone + 'static { +pub trait SubsystemSender: Send + 'static +where + OutgoingMessage: Send, +{ /// Send a direct message to some other `Subsystem`, routed based on message type. - async fn send_message(&mut self, msg: Message); + async fn send_message(&mut self, msg: OutgoingMessage); /// Send multiple direct messages to other `Subsystem`s, routed based on message type. async fn send_messages(&mut self, msgs: T) where - T: IntoIterator + Send, + T: IntoIterator + Send, T::IntoIter: Send; /// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message @@ -472,7 +471,7 @@ pub trait SubsystemSender: Send + Clone + 'static { /// /// This function should be used only when there is some other bounding factor on the messages /// sent with it. Otherwise, it risks a memory leak. - fn send_unbounded_message(&mut self, msg: Message); + fn send_unbounded_message(&mut self, msg: OutgoingMessage); } /// A future that wraps another future with a `Delay` allowing for time-limited futures. diff --git a/node/overseer/src/dummy.rs b/node/overseer/src/dummy.rs index 1b4dfc82a5d7..b4a97c3e6321 100644 --- a/node/overseer/src/dummy.rs +++ b/node/overseer/src/dummy.rs @@ -15,8 +15,8 @@ // along with Polkadot. If not, see . use crate::{ - prometheus::Registry, AllMessages, HeadSupportsParachains, InitializedOverseerBuilder, - MetricsTrait, Overseer, OverseerMetrics, OverseerSignal, OverseerSubsystemContext, SpawnNamed, + prometheus::Registry, HeadSupportsParachains, InitializedOverseerBuilder, MetricsTrait, + Overseer, OverseerMetrics, OverseerSignal, OverseerSubsystemContext, SpawnNamed, KNOWN_LEAVES_CACHE_SIZE, }; use lru::LruCache; @@ -30,11 +30,7 @@ pub struct DummySubsystem; impl Subsystem for DummySubsystem where - Context: SubsystemContext< - Signal = OverseerSignal, - Error = SubsystemError, - OutgoingMessages = DummySubsystemOutgoingMessages, - >, + Context: SubsystemContext, { fn start(self, mut ctx: Context) -> SpawnedSubsystem { let future = Box::pin(async move { diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 89067dbcef22..b622ed1b419a 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -414,67 +414,73 @@ pub async fn forward_events>(client: Arc

, mut hand network=NetworkBridgeEvent, )] pub struct Overseer { - #[subsystem(no_dispatch, CandidateValidationMessage)] + #[subsystem(no_dispatch, CandidateValidationMessage, sends: [])] candidate_validation: CandidateValidation, - #[subsystem(no_dispatch, PvfCheckerMessage)] + #[subsystem(no_dispatch, PvfCheckerMessage, sends: [])] pvf_checker: PvfChecker, - #[subsystem(no_dispatch, CandidateBackingMessage)] + #[subsystem(no_dispatch, CandidateBackingMessage, sends: [ + CandidateValidationMessage, + CollatorProtocolMessage, + AvailabilityDistributionMessage, + StatementDistributionMessage, + ProvisionerMessage, + ])] candidate_backing: CandidateBacking, - #[subsystem(StatementDistributionMessage)] + #[subsystem(StatementDistributionMessage, sends: [])] statement_distribution: StatementDistribution, - #[subsystem(no_dispatch, AvailabilityDistributionMessage)] + #[subsystem(no_dispatch, AvailabilityDistributionMessage, sends: [])] availability_distribution: AvailabilityDistribution, - #[subsystem(no_dispatch, AvailabilityRecoveryMessage)] + #[subsystem(no_dispatch, AvailabilityRecoveryMessage, sends: [])] availability_recovery: AvailabilityRecovery, - #[subsystem(blocking, no_dispatch, BitfieldSigningMessage)] + #[subsystem(blocking, no_dispatch, BitfieldSigningMessage, sends: [])] bitfield_signing: BitfieldSigning, - #[subsystem(BitfieldDistributionMessage)] + #[subsystem(BitfieldDistributionMessage, sends: [])] bitfield_distribution: BitfieldDistribution, - #[subsystem(no_dispatch, ProvisionerMessage)] + #[subsystem(no_dispatch, ProvisionerMessage, sends: [])] provisioner: Provisioner, - #[subsystem(no_dispatch, blocking, RuntimeApiMessage)] + #[subsystem(no_dispatch, blocking, RuntimeApiMessage, sends: [])] runtime_api: RuntimeApi, - #[subsystem(no_dispatch, blocking, AvailabilityStoreMessage)] + #[subsystem(no_dispatch, blocking, AvailabilityStoreMessage, sends: [])] availability_store: AvailabilityStore, - #[subsystem(no_dispatch, NetworkBridgeMessage)] + #[subsystem(no_dispatch, NetworkBridgeMessage, sends: [])] network_bridge: NetworkBridge, - #[subsystem(no_dispatch, blocking, ChainApiMessage)] + #[subsystem(no_dispatch, blocking, ChainApiMessage, sends: [])] chain_api: ChainApi, - #[subsystem(no_dispatch, CollationGenerationMessage)] + #[subsystem(no_dispatch, CollationGenerationMessage, sends: [])] collation_generation: CollationGeneration, - #[subsystem(no_dispatch, CollatorProtocolMessage)] + #[subsystem(no_dispatch, CollatorProtocolMessage, sends: [])] collator_protocol: CollatorProtocol, - #[subsystem(ApprovalDistributionMessage)] + #[subsystem(ApprovalDistributionMessage, sends: [])] approval_distribution: ApprovalDistribution, - #[subsystem(no_dispatch, blocking, ApprovalVotingMessage)] + #[subsystem(no_dispatch, blocking, ApprovalVotingMessage, sends: [])] approval_voting: ApprovalVoting, - #[subsystem(GossipSupportMessage)] + #[subsystem(GossipSupportMessage, sends: [])] gossip_support: GossipSupport, - #[subsystem(no_dispatch, blocking, DisputeCoordinatorMessage)] + #[subsystem(no_dispatch, blocking, DisputeCoordinatorMessage, sends: [])] dispute_coordinator: DisputeCoordinator, - #[subsystem(no_dispatch, DisputeDistributionMessage)] + #[subsystem(no_dispatch, DisputeDistributionMessage, sends: [])] dispute_distribution: DisputeDistribution, - #[subsystem(no_dispatch, blocking, ChainSelectionMessage)] + #[subsystem(no_dispatch, blocking, ChainSelectionMessage, sends: [])] chain_selection: ChainSelection, /// External listeners waiting for a hash to be in the active-leave set. diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index cc4bd38f05b0..3caf09917d74 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -60,11 +60,7 @@ struct TestSubsystem1(metered::MeteredSender); impl overseer::Subsystem for TestSubsystem1 where - C: overseer::SubsystemContext< - Message = CandidateValidationMessage, - Signal = OverseerSignal, - AllMessages = AllMessages, - >, + C: overseer::SubsystemContext, { fn start(self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0; @@ -93,11 +89,7 @@ struct TestSubsystem2(metered::MeteredSender); impl overseer::Subsystem for TestSubsystem2 where - C: overseer::SubsystemContext< - Message = CandidateBackingMessage, - Signal = OverseerSignal, - AllMessages = AllMessages, - >, + C: overseer::SubsystemContext, { fn start(self, mut ctx: C) -> SpawnedSubsystem { let sender = self.0.clone(); @@ -138,11 +130,7 @@ struct ReturnOnStart; impl overseer::Subsystem for ReturnOnStart where - C: overseer::SubsystemContext< - Message = CandidateBackingMessage, - Signal = OverseerSignal, - AllMessages = AllMessages, - >, + C: overseer::SubsystemContext, { fn start(self, mut _ctx: C) -> SpawnedSubsystem { SpawnedSubsystem { @@ -311,11 +299,7 @@ struct TestSubsystem5(metered::MeteredSender); impl overseer::Subsystem for TestSubsystem5 where - C: overseer::SubsystemContext< - Message = CandidateValidationMessage, - Signal = OverseerSignal, - AllMessages = AllMessages, - >, + C: overseer::SubsystemContext, { fn start(self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); @@ -347,11 +331,7 @@ struct TestSubsystem6(metered::MeteredSender); impl Subsystem for TestSubsystem6 where - C: overseer::SubsystemContext< - Message = CandidateBackingMessage, - Signal = OverseerSignal, - AllMessages = AllMessages, - >, + C: overseer::SubsystemContext, { fn start(self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); @@ -756,7 +736,7 @@ impl CounterSubsystem { impl Subsystem for CounterSubsystem where - C: overseer::SubsystemContext, + C: overseer::SubsystemContext, M: Send, { fn start(self, mut ctx: C) -> SpawnedSubsystem {