Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
malus - mockable overseer mvp (#3224)
Browse files Browse the repository at this point in the history
  • Loading branch information
drahnr authored Jun 16, 2021
1 parent 5161ae5 commit 395324a
Show file tree
Hide file tree
Showing 16 changed files with 753 additions and 219 deletions.
19 changes: 17 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ members = [
"node/network/collator-protocol",
"node/network/gossip-support",
"node/overseer",
"node/malus",
"node/primitives",
"node/service",
"node/subsystem",
Expand Down
2 changes: 2 additions & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,5 @@ try-runtime = [ "service/try-runtime" ]
kusama-native = [ "service/kusama-native" ]
westend-native = [ "service/westend-native" ]
rococo-native = [ "service/rococo-native" ]

malus = [ "full-node", "service/malus" ]
95 changes: 53 additions & 42 deletions cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,53 +181,64 @@ fn ensure_dev(spec: &Box<dyn service::ChainSpec>) -> std::result::Result<(), Str
}
}

/// Parses polkadot specific CLI arguments and run the service.
pub fn run() -> Result<()> {
let cli = Cli::from_args();
/// Launch a node, accepting arguments just like a regular node,
/// accepts an alternative overseer generator, to adjust behavior
/// for integration tests as needed.
#[cfg(feature = "malus")]
pub fn run_node(cli: Cli, overseer_gen: impl service::OverseerGen) -> Result<()> {
run_node_inner(cli, overseer_gen)
}

match &cli.subcommand {
None => {
let runner = cli.create_runner(&cli.run.base)
.map_err(Error::from)?;
let chain_spec = &runner.config().chain_spec;
fn run_node_inner(cli: Cli, overseer_gen: impl service::OverseerGen) -> Result<()> {
let runner = cli.create_runner(&cli.run.base)
.map_err(Error::from)?;
let chain_spec = &runner.config().chain_spec;

set_default_ss58_version(chain_spec);
set_default_ss58_version(chain_spec);

let grandpa_pause = if cli.run.grandpa_pause.is_empty() {
None
} else {
Some((cli.run.grandpa_pause[0], cli.run.grandpa_pause[1]))
};
let grandpa_pause = if cli.run.grandpa_pause.is_empty() {
None
} else {
Some((cli.run.grandpa_pause[0], cli.run.grandpa_pause[1]))
};

if chain_spec.is_kusama() {
info!("----------------------------");
info!("This chain is not in any way");
info!(" endorsed by the ");
info!(" KUSAMA FOUNDATION ");
info!("----------------------------");
}
if chain_spec.is_kusama() {
info!("----------------------------");
info!("This chain is not in any way");
info!(" endorsed by the ");
info!(" KUSAMA FOUNDATION ");
info!("----------------------------");
}

let jaeger_agent = cli.run.jaeger_agent;

runner.run_node_until_exit(move |config| async move {
let role = config.role.clone();

match role {
#[cfg(feature = "browser")]
Role::Light => service::build_light(config).map(|(task_manager, _)| task_manager).map_err(Into::into),
#[cfg(not(feature = "browser"))]
Role::Light => Err(Error::Other("Light client not enabled".into())),
_ => service::build_full(
config,
service::IsCollator::No,
grandpa_pause,
cli.run.no_beefy,
jaeger_agent,
None,
).map(|full| full.task_manager).map_err(Into::into)
}
})
},
let jaeger_agent = cli.run.jaeger_agent;

runner.run_node_until_exit(move |config| async move {
let role = config.role.clone();

match role {
#[cfg(feature = "browser")]
Role::Light => service::build_light(config).map(|(task_manager, _)| task_manager).map_err(Into::into),
#[cfg(not(feature = "browser"))]
Role::Light => Err(Error::Other("Light client not enabled".into())),
_ => service::build_full(
config,
service::IsCollator::No,
grandpa_pause,
cli.run.no_beefy,
jaeger_agent,
None,
overseer_gen,
).map(|full| full.task_manager).map_err(Into::into)
}
})
}

/// Parses polkadot specific CLI arguments and run the service.
pub fn run() -> Result<()> {
let cli = Cli::from_args();

match &cli.subcommand {
None => run_node_inner(cli, service::RealOverseerGen),
Some(Subcommand::BuildSpec(cmd)) => {
let runner = cli.create_runner(cmd)?;
Ok(runner.sync_run(|config| {
Expand Down
5 changes: 4 additions & 1 deletion cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ mod command;
pub use service::{
self,
ProvideRuntimeApi, CoreApi, IdentifyVariant,
Block, RuntimeApiCollection, TFullClient
Block, RuntimeApiCollection, TFullClient,
};

#[cfg(feature = "malus")]
pub use service::create_default_subsystems;

#[cfg(feature = "cli")]
pub use cli::*;

Expand Down
1 change: 1 addition & 0 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use async_trait::async_trait;
const LOG_TARGET: &'static str = "parachain::candidate-validation";

/// Configuration for the candidate validation subsystem
#[derive(Clone)]
pub struct Config {
/// The path where candidate validation can store compiled artifacts for PVFs.
pub artifacts_cache_path: PathBuf,
Expand Down
28 changes: 28 additions & 0 deletions node/malus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[lib]
name = "malus"
path = "src/lib.rs"

[[bin]]
name = "malus-variant-a"
path = "src/variant-a.rs"

[package]
name = "polkadot-test-malus"
description = "Misbehaving nodes for local testnets, system and simnet tests."
license = "GPL-3.0-only"
version = "0.9.4"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
readme = "README.md"
publish = false

[dependencies]
polkadot-cli = { path = "../../cli", default-features = false, features = [ "cli", "malus" ] }
polkadot-node-subsystem = { path = "../subsystem" }
polkadot-node-subsystem-util = { path = "../subsystem-util" }
polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" }
parity-util-mem = { version = "*", default-features = false, features = ["jemalloc-global"] }
color-eyre = { version = "0.5.11", default-features = false }
assert_matches = "1.5"
structopt = "0.3.21"
async-trait = "0.1.50"
184 changes: 184 additions & 0 deletions node/malus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2017-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! A small set of wrapping types to cover most of our adversary test cases.
//!
//! This allows types with internal mutability to synchronize across
//! multiple subsystems and intercept or replace incoming and outgoing
//! messages on the overseer level.

use polkadot_node_subsystem::*;
pub use polkadot_node_subsystem::{messages::AllMessages, FromOverseer};
use std::future::Future;
use std::pin::Pin;

/// Filter incoming and outgoing messages.
pub trait MsgFilter: Send + Sync + Clone + 'static {
/// The message type the original subsystm handles incoming.
type Message: Send + 'static;

/// Filter messages that are to be received by
/// the subsystem.
fn filter_in(&self, msg: FromOverseer<Self::Message>) -> Option<FromOverseer<Self::Message>> {
Some(msg)
}

/// Modify outgoing messages.
fn filter_out(&self, msg: AllMessages) -> Option<AllMessages> {
Some(msg)
}
}

/// A sender with the outgoing messages filtered.
#[derive(Clone)]
pub struct FilteredSender<Sender, Fil> {
inner: Sender,
message_filter: Fil,
}

#[async_trait::async_trait]
impl<Sender, Fil> SubsystemSender for FilteredSender<Sender, Fil>
where
Sender: SubsystemSender,
Fil: MsgFilter,
{
async fn send_message(&mut self, msg: AllMessages) {
if let Some(msg) = self.message_filter.filter_out(msg) {
self.inner.send_message(msg).await;
}
}

async fn send_messages<T>(&mut self, msgs: T)
where
T: IntoIterator<Item = AllMessages> + Send,
T::IntoIter: Send,
{
for msg in msgs {
self.send_message(msg).await;
}
}

fn send_unbounded_message(&mut self, msg: AllMessages) {
if let Some(msg) = self.message_filter.filter_out(msg) {
self.inner.send_unbounded_message(msg);
}
}
}

/// A subsystem context, that filters the outgoing messages.
pub struct FilteredContext<Context: SubsystemContext, Fil: MsgFilter> {
inner: Context,
message_filter: Fil,
sender: FilteredSender<<Context as SubsystemContext>::Sender, Fil>,
}

impl<Context, Fil> FilteredContext<Context, Fil>
where
Context: SubsystemContext,
Fil: MsgFilter<Message = <Context as SubsystemContext>::Message>,
{
pub fn new(mut inner: Context, message_filter: Fil) -> Self {
let sender = FilteredSender::<<Context as SubsystemContext>::Sender, Fil> {
inner: inner.sender().clone(),
message_filter: message_filter.clone(),
};
Self {
inner,
message_filter,
sender,
}
}
}

#[async_trait::async_trait]
impl<Context, Fil> SubsystemContext for FilteredContext<Context, Fil>
where
Context: SubsystemContext,
Fil: MsgFilter<Message = <Context as SubsystemContext>::Message>,
{
type Message = <Context as SubsystemContext>::Message;
type Sender = FilteredSender<<Context as SubsystemContext>::Sender, Fil>;

async fn try_recv(&mut self) -> Result<Option<FromOverseer<Self::Message>>, ()> {
loop {
match self.inner.try_recv().await? {
None => return Ok(None),
Some(msg) => {
if let Some(msg) = self.message_filter.filter_in(msg) {
return Ok(Some(msg));
}
}
}
}
}

async fn recv(&mut self) -> SubsystemResult<FromOverseer<Self::Message>> {
loop {
let msg = self.inner.recv().await?;
if let Some(msg) = self.message_filter.filter_in(msg) {
return Ok(msg);
}
}
}

async fn spawn(
&mut self,
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> SubsystemResult<()> {
self.inner.spawn(name, s).await
}

async fn spawn_blocking(
&mut self,
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> SubsystemResult<()> {
self.inner.spawn_blocking(name, s).await
}

fn sender(&mut self) -> &mut Self::Sender {
&mut self.sender
}
}

/// A subsystem to which incoming and outgoing filters are applied.
pub struct FilteredSubsystem<Sub, Fil> {
subsystem: Sub,
message_filter: Fil,
}

impl<Sub, Fil> FilteredSubsystem<Sub, Fil> {
pub fn new(subsystem: Sub, message_filter: Fil) -> Self {
Self {
subsystem,
message_filter,
}
}
}

impl<Context, Sub, Fil> Subsystem<Context> for FilteredSubsystem<Sub, Fil>
where
Context: SubsystemContext + Sync + Send,
Sub: Subsystem<FilteredContext<Context, Fil>>,
FilteredContext<Context, Fil>: SubsystemContext,
Fil: MsgFilter<Message = <Context as SubsystemContext>::Message>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let ctx = FilteredContext::new(ctx, self.message_filter);
Subsystem::<FilteredContext<Context, Fil>>::start(self.subsystem, ctx)
}
}
Loading

0 comments on commit 395324a

Please sign in to comment.