Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

dyn overseer channel capacity #5454

Merged
merged 8 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ pub struct RunCmd {
/// telemetry, if telemetry is enabled.
#[clap(long)]
pub no_hardware_benchmarks: bool,

/// Overseer message capacity override.
///
/// **Dangerous!** Do not touch unless explicitly adviced to.
#[clap(long)]
pub overseer_channel_capacity_override: Option<usize>,
}

#[allow(missing_docs)]
Expand Down
1 change: 1 addition & 0 deletions cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ where
None,
false,
overseer_gen,
cli.run.overseer_channel_capacity_override,
hwbench,
)
.map(|full| full.task_manager)
Expand Down
81 changes: 73 additions & 8 deletions node/overseer/overseer-gen/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#baggage_name: self. #baggage_name,
)*
spawner: self.spawner,

channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
/// Specify the the initialization function for a subsystem
Expand All @@ -171,6 +174,10 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#baggage_name: self. #baggage_name,
)*
spawner: self.spawner,


channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
Expand Down Expand Up @@ -207,6 +214,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#baggage_name: self. #baggage_name,
)*
spawner: self.spawner,

channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
Expand Down Expand Up @@ -254,6 +264,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#to_keep_baggage_name: self. #to_keep_baggage_name,
)*
spawner: self.spawner,

channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
Expand All @@ -272,6 +285,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#to_keep_baggage_name: self. #to_keep_baggage_name,
)*
spawner: self.spawner,

channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
Expand Down Expand Up @@ -359,20 +375,25 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
pub fn handle(&self) -> #handle {
self.handle.clone()
}
}

impl ::std::default::Default for #connector {
fn default() -> Self {
/// Create a new connector with non-default event channel capacity.
pub fn with_event_capacity(event_channel_capacity: usize) -> Self {
let (events_tx, events_rx) = #support_crate ::metered::channel::<
#event
>(SIGNAL_CHANNEL_CAPACITY);
>(event_channel_capacity);

Self {
handle: events_tx,
consumer: events_rx,
}
}
}

impl ::std::default::Default for #connector {
fn default() -> Self {
Self::with_event_capacity(SIGNAL_CHANNEL_CAPACITY)
}
}
});

ts.extend(quote!{
Expand All @@ -385,6 +406,11 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#baggage_name: #baggage_passthrough_state_generics,
)*
spawner: InitStateSpawner,
// user provided runtime overrides,
// if `None`, the `overlord(message_capacity=123,..)` is used
// or the default value.
channel_capacity: Option<usize>,
signal_capacity: Option<usize>,
ordian marked this conversation as resolved.
Show resolved Hide resolved
}
});

Expand All @@ -406,6 +432,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#field_name: Missing::<#field_type>::default(),
)*
spawner: Missing::<S>::default(),

channel_capacity: None,
signal_capacity: None,
}
}
}
Expand All @@ -419,18 +448,48 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#spawner_where_clause
{
/// The `spawner` to use for spawning tasks.
pub fn spawner(self, spawner: S) -> #builder<Init<S>, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
pub fn spawner(self, spawner: S) -> #builder<
Init<S>,
#( #subsystem_passthrough_state_generics, )*
#( #baggage_passthrough_state_generics, )*
>
{
#builder {
#(
#field_name: self. #field_name,
)*
spawner: Init::<S>::Value(spawner),

channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
});

// message and signal channel capacity
drahnr marked this conversation as resolved.
Show resolved Hide resolved
ts.extend(quote! {
impl<S, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
#builder<Init<S>, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
where
#spawner_where_clause,
{
/// Set the interconnecting signal channel capacity.
pub fn signal_channel_capacity(mut self, capacity: usize) -> Self
{
self.signal_capacity = Some(capacity);
self
}

/// Set the interconnecting message channel capacities.
drahnr marked this conversation as resolved.
Show resolved Hide resolved
pub fn message_channel_capacity(mut self, capacity: usize) -> Self
{
self.channel_capacity = Some(capacity);
self
}
}
});

ts.extend(quote! {
/// Type used to represent a builder where all fields are initialized and the overseer could be constructed.
pub type #initialized_builder<#initialized_builder_generics> = #builder<Init<S>, #( Init<#field_type>, )*>;
Expand All @@ -446,7 +505,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
/// Complete the construction and create the overseer type.
pub fn build(self)
-> ::std::result::Result<(#overseer_name<S, #( #baggage_generic_ty, )*>, #handle), #error_ty> {
let connector = #connector ::default();
let connector = #connector ::with_event_capacity(
self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY)
);
self.build_with_connector(connector)
}

Expand All @@ -470,7 +531,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
=
#support_crate ::metered::channel::<
MessagePacket< #consumes >
>(CHANNEL_CAPACITY);
>(
self.channel_capacity.unwrap_or(CHANNEL_CAPACITY)
);
)*

#(
Expand Down Expand Up @@ -510,7 +573,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
let message_rx: SubsystemIncomingMessages< #consumes > = #support_crate ::select(
#channel_name_rx, #channel_name_unbounded_rx
);
let (signal_tx, signal_rx) = #support_crate ::metered::channel(SIGNAL_CHANNEL_CAPACITY);
let (signal_tx, signal_rx) = #support_crate ::metered::channel(
self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY)
);

// Generate subsystem name based on overseer field name.
let subsystem_string = String::from(stringify!(#subsystem_name));
Expand Down
7 changes: 7 additions & 0 deletions node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ pub fn new_full<RuntimeApi, ExecutorDispatch, OverseerGenerator>(
program_path: Option<std::path::PathBuf>,
overseer_enable_anyways: bool,
overseer_gen: OverseerGenerator,
overseer_message_channel_capacity_override: Option<usize>,
hwbench: Option<sc_sysinfo::HwBench>,
) -> Result<NewFull<Arc<FullClient<RuntimeApi, ExecutorDispatch>>>, Error>
where
Expand Down Expand Up @@ -1038,6 +1039,7 @@ where
chain_selection_config,
dispute_coordinator_config,
pvf_checker_enabled,
overseer_message_channel_capacity_override,
},
)
.map_err(|e| {
Expand Down Expand Up @@ -1326,6 +1328,7 @@ pub fn build_full(
telemetry_worker_handle: Option<TelemetryWorkerHandle>,
overseer_enable_anyways: bool,
overseer_gen: impl OverseerGen,
overseer_message_channel_override: Option<usize>,
hwbench: Option<sc_sysinfo::HwBench>,
) -> Result<NewFull<Client>, Error> {
#[cfg(feature = "rococo-native")]
Expand All @@ -1343,6 +1346,7 @@ pub fn build_full(
None,
overseer_enable_anyways,
overseer_gen,
overseer_message_channel_override,
hwbench,
)
.map(|full| full.with_client(Client::Rococo))
Expand All @@ -1360,6 +1364,7 @@ pub fn build_full(
None,
overseer_enable_anyways,
overseer_gen,
overseer_message_channel_override,
hwbench,
)
.map(|full| full.with_client(Client::Kusama))
Expand All @@ -1377,6 +1382,7 @@ pub fn build_full(
None,
overseer_enable_anyways,
overseer_gen,
overseer_message_channel_override,
hwbench,
)
.map(|full| full.with_client(Client::Westend))
Expand All @@ -1394,6 +1400,7 @@ pub fn build_full(
None,
overseer_enable_anyways,
overseer_gen,
None,
hwbench,
)
.map(|full| full.with_client(Client::Polkadot))
Expand Down
10 changes: 9 additions & 1 deletion node/service/src/overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ where
pub dispute_coordinator_config: DisputeCoordinatorConfig,
/// Enable PVF pre-checking
pub pvf_checker_enabled: bool,
/// Overseer channel capacity override.
pub overseer_message_channel_capacity_override: Option<usize>,
}

/// Obtain a prepared `OverseerBuilder`, that is initialized
Expand Down Expand Up @@ -138,6 +140,7 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>(
chain_selection_config,
dispute_coordinator_config,
pvf_checker_enabled,
overseer_message_channel_capacity_override,
}: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<
InitializedOverseerBuilder<
Expand Down Expand Up @@ -292,7 +295,12 @@ where
.known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE))
.metrics(metrics)
.spawner(spawner);
Ok(builder)

if let Some(capacity) = overseer_message_channel_capacity_override {
Ok(builder.message_channel_capacity(capacity))
} else {
Ok(builder)
}
}

/// Trait for the `fn` generating the overseer.
Expand Down
1 change: 1 addition & 0 deletions parachain/test-parachains/adder/collator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ fn main() -> Result<()> {
None,
false,
polkadot_service::RealOverseerGen,
cli.run.overseer_channel_capacity_override,
None,
)
.map_err(|e| e.to_string())?;
Expand Down
1 change: 1 addition & 0 deletions parachain/test-parachains/undying/collator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ fn main() -> Result<()> {
None,
false,
polkadot_service::RealOverseerGen,
cli.run.overseer_channel_capacity_override,
None,
)
.map_err(|e| e.to_string())?;
Expand Down