diff --git a/Cargo.lock b/Cargo.lock index cd350acf4a6e..5f039f2d12f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3303,12 +3303,12 @@ checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" name = "overseer" version = "0.1.0" dependencies = [ - "exit-future", "femme", "futures 0.3.5", "futures-timer 3.0.2", "kv-log-macro", "log 0.4.8", + "streamunordered", ] [[package]] @@ -7225,6 +7225,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "streamunordered" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9394ee1338fee8370bee649f8a7170b3a56917903a0956467ad192dcf8699ca" +dependencies = [ + "futures-core", + "futures-sink", + "futures-util", + "slab", +] + [[package]] name = "string" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index b93bbbeacbdc..9192ff666d08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ members = [ "erasure-coding", "network", "network/test", - "overseer", + "overseer", "primitives", "runtime/common", "runtime/polkadot", diff --git a/overseer/Cargo.toml b/overseer/Cargo.toml index 12d18475af46..bcd0a8e9e529 100644 --- a/overseer/Cargo.toml +++ b/overseer/Cargo.toml @@ -1,17 +1,17 @@ [package] name = "overseer" version = "0.1.0" -authors = ["Fedor Sakharov "] +authors = ["Parity Technologies "] edition = "2018" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] futures = "0.3.5" log = "0.4.8" -exit-future = "0.2.0" +futures-timer = "3.0.2" +streamunordered = "0.5.1" [dev-dependencies] +futures = { version = "0.3.5", features = ["thread-pool"] } futures-timer = "3.0.2" femme = "2.0.1" log = "0.4.8" diff --git a/overseer/examples/minimal-example.rs b/overseer/examples/minimal-example.rs index 1df86a48c339..4dd37dbafea1 100644 --- a/overseer/examples/minimal-example.rs +++ b/overseer/examples/minimal-example.rs @@ -1,111 +1,134 @@ -use std::collections::HashSet; -use std::time::Duration; +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Shows a basic usage of the `Overseer`: +//! * Spawning subsystems and subsystem child jobs +//! * Establishing message passing -use futures::pending; +use std::time::Duration; +use futures::{ + pending, pin_mut, executor, select, stream, + FutureExt, StreamExt, +}; use futures_timer::Delay; use kv_log_macro as log; -use overseer::{Overseer, Subsystem, SubsystemContext, SubsystemJob}; +use overseer::{ + AllMessages, CandidateBackingSubsystemMessage, FromOverseer, + Overseer, Subsystem, SubsystemContext, SpawnedSubsystem, ValidationSubsystemMessage, +}; struct Subsystem1; impl Subsystem1 { - async fn run(mut ctx: SubsystemContext) { + async fn run(mut ctx: SubsystemContext) { loop { match ctx.try_recv().await { Ok(Some(msg)) => { - log::info!("Subsystem1 received message {}", msg); + if let FromOverseer::Communication { msg } = msg { + log::info!("msg {:?}", msg); + } + continue; } Ok(None) => (), - Err(_) => {} + Err(_) => { + log::info!("exiting"); + return; + } } Delay::new(Duration::from_secs(1)).await; - ctx.send_msg(10).await; - pending!(); + ctx.send_msg(AllMessages::Validation( + ValidationSubsystemMessage::ValidityAttestation + )).await.unwrap(); } } - - fn new() -> Self { - Self - } } -impl Subsystem for Subsystem1 { - fn start(&mut self, ctx: SubsystemContext) -> SubsystemJob { - SubsystemJob(Box::pin(async move { +impl Subsystem for Subsystem1 { + fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(async move { Self::run(ctx).await; - Ok(()) })) } } struct Subsystem2; - impl Subsystem2 { - async fn run(mut ctx: SubsystemContext) { - let ss3 = Box::new(Subsystem3); + async fn run(mut ctx: SubsystemContext) { + ctx.spawn(Box::pin(async { + loop { + log::info!("Job tick"); + Delay::new(Duration::from_secs(1)).await; + } + })).await.unwrap(); - let ss3_id = ctx.spawn(ss3).await; - log::info!("Received subsystem id {:?}", ss3_id); loop { match ctx.try_recv().await { Ok(Some(msg)) => { - log::info!("Subsystem2 received message {}", msg); + log::info!("Subsystem2 received message {:?}", msg); + continue; } - Ok(None) => (), - Err(_) => {} + Ok(None) => { pending!(); } + Err(_) => { + log::info!("exiting"); + return; + }, } - pending!(); } } - - fn new() -> Self { - Self - } } -impl Subsystem for Subsystem2 { - fn start(&mut self, ctx: SubsystemContext) -> SubsystemJob { - SubsystemJob(Box::pin(async move { +impl Subsystem for Subsystem2 { + fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(async move { Self::run(ctx).await; - Ok(()) - })) - } -} - -struct Subsystem3; - -impl Subsystem for Subsystem3 { - fn start(&mut self, mut ctx: SubsystemContext) -> SubsystemJob { - SubsystemJob(Box::pin(async move { - // TODO: ctx actually has to be used otherwise the channels are dropped - loop { - // ignore all incoming msgs - while let Ok(Some(_)) = ctx.try_recv().await { - } - log::info!("Subsystem3 tick"); - Delay::new(Duration::from_secs(1)).await; - - pending!(); - } })) } - - fn can_recv_msg(&self, _msg: &usize) -> bool { false } } fn main() { femme::with_level(femme::LevelFilter::Trace); + let spawner = executor::ThreadPool::new().unwrap(); futures::executor::block_on(async { - let subsystems: Vec>> = vec![ - Box::new(Subsystem1::new()), - Box::new(Subsystem2::new()), - ]; + let timer_stream = stream::repeat(()).then(|_| async { + Delay::new(Duration::from_secs(1)).await; + }); + + let (overseer, _handler) = Overseer::new( + Box::new(Subsystem2), + Box::new(Subsystem1), + spawner, + ).unwrap(); + let overseer_fut = overseer.run().fuse(); + let timer_stream = timer_stream; + + pin_mut!(timer_stream); + pin_mut!(overseer_fut); - let overseer = Overseer::new(subsystems); - overseer.run().await; + loop { + select! { + _ = overseer_fut => break, + _ = timer_stream.next() => { + log::info!("tick"); + } + complete => break, + } + } }); } diff --git a/overseer/src/lib.rs b/overseer/src/lib.rs index 5914baa231b9..f7ac6cac5079 100644 --- a/overseer/src/lib.rs +++ b/overseer/src/lib.rs @@ -1,3 +1,19 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + //! # Overseer //! //! `overseer` implements the Overseer architecture described in the @@ -12,13 +28,8 @@ //! that this protocol is the only way tasks communicate with each other, however //! at this moment there are no foolproof guards against other ways of communication. //! -//! To spawn something the `Overseer` needs to know what actually needs to be spawn. -//! This is solved by splitting the actual type of the subsystem from the type that -//! is being asyncronously run on the `Overseer`. What we need from the subsystem -//! is the ability to return some `Future` object that the `Overseer` can run and -//! dispatch messages to/from it. Let's take a look at the simplest case with two -//! `Subsystems`: -//! +//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that +//! share the same behavior from `Overseer`'s point of view. //! //! ```text //! +-----------------------------+ @@ -37,30 +48,27 @@ //! start() start() //! V V //! ..................| Overseer "runs" these |....................... -//! . +-------------------+ +---------------------+ . -//! . | SubsystemInstance1| | SubsystemInstance2 | . -//! . +-------------------+ +---------------------+ . +//! . +--------------------+ +---------------------+ . +//! . | SubsystemInstance1 | | SubsystemInstance2 | . +//! . +--------------------+ +---------------------+ . //! .................................................................. //! ``` use std::fmt::Debug; use std::pin::Pin; -use std::collections::{HashSet, HashMap}; use std::task::Poll; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; use futures::channel::{mpsc, oneshot}; use futures::{ - pending, poll, - Future, SinkExt, StreamExt, + pending, poll, select, + future::{BoxFuture, RemoteHandle}, + stream::FuturesUnordered, + task::{Spawn, SpawnError, SpawnExt}, + Future, FutureExt, SinkExt, StreamExt, }; - - -// TODO: a better way to get unique IDs for jobs -fn get_id() -> usize { - static COUNTER:AtomicUsize = AtomicUsize::new(1); - COUNTER.fetch_add(1, Ordering::Relaxed) -} +use futures_timer::Delay; +use streamunordered::{StreamYield, StreamUnordered}; /// An error type that describes faults that may happen /// @@ -69,75 +77,204 @@ fn get_id() -> usize { /// * Subsystems dying when they are not expected to /// * Subsystems not dying when they are told to die /// * etc. -// TODO: populate with actual error cases. #[derive(Debug)] pub struct SubsystemError; -/// A `Result` type that wraps `SubsystemError` and an empty type on success. -// TODO: Proper success type. -pub type SubsystemResult = Result<(), SubsystemError>; +impl From for SubsystemError { + fn from(_: mpsc::SendError) -> Self { + Self + } +} + +impl From for SubsystemError { + fn from(_: oneshot::Canceled) -> Self { + Self + } +} -/// An asynchronous job that runs inside and being overseen by the `Overseer`. +impl From for SubsystemError { + fn from(_: SpawnError) -> Self { + Self + } +} + +/// A `Result` type that wraps [`SubsystemError`]. /// -/// In essence it's just a newtype wrapping a pinned `Future` dyn trait object. -pub struct SubsystemJob(pub Pin>>); +/// [`SubsystemError`]: struct.SubsystemError.html +pub type SubsystemResult = Result; -/// A type of messages that are used inside the `Overseer`. +/// An asynchronous subsystem task that runs inside and being overseen by the [`Overseer`]. /// -/// It is generic over some `T` that is intended to be a message type -/// being used by the subsystems running on the `Overseer`. Most likely -/// this type will be one large `enum` covering all possible messages in -/// the system. -pub enum OverseerMessage { - /// This is a message generated by a `Subsystem`. - /// Wraps the messge itself and has an optional `to` of - /// someone who can receive this message. - /// - /// If that `to` is present the message will be targetedly sent to the intended - /// receiver. The most obvious use case of this is communicating with children. - SubsystemMessage{ - to: Option, - msg: T, - }, +/// In essence it's just a newtype wrapping a `BoxFuture`. +/// +/// [`Overseer`]: struct.Overseer.html +pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>); + +// A capacity of bounded channels inside the overseer. +const CHANNEL_CAPACITY: usize = 1024; +// A graceful `Overseer` teardown time delay. +const STOP_DELAY: u64 = 1; + +/// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. +/// +/// It wraps a system-wide [`AllMessages`] type that represents all possible +/// messages in the system. +/// +/// [`AllMessages`]: enum.AllMessages.html +/// [`Subsystem`]: trait.Subsystem.html +/// [`Overseer`]: struct.Overseer.html +enum ToOverseer { + /// This is a message sent by a `Subsystem`. + SubsystemMessage(AllMessages), + /// A message that wraps something the `Subsystem` is desiring to /// spawn on the overseer and a `oneshot::Sender` to signal the result /// of the spawn. - Spawn{ - s: Box>, - res: oneshot::Sender>, + SpawnJob { + s: BoxFuture<'static, ()>, + res: oneshot::Sender>, }, } -impl Debug for OverseerMessage { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +/// Some event from outer world. +enum Event { + BlockImport, + BlockFinalized, + MsgToSubsystem(AllMessages), + Stop, +} + +/// Some message that is sent from one of the `Subsystem`s to the outside world. +pub enum OutboundMessage { + SubsystemMessage { + msg: AllMessages, + } +} + +/// A handler used to communicate with the [`Overseer`]. +/// +/// [`Overseer`]: struct.Overseer.html +pub struct OverseerHandler { + events_tx: mpsc::Sender, +} + +impl OverseerHandler { + /// Inform the `Overseer` that that some block was imported. + pub async fn block_imported(&mut self) -> SubsystemResult<()> { + self.events_tx.send(Event::BlockImport).await?; + + Ok(()) + } + + /// Send some message to one of the `Subsystem`s. + pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> { + self.events_tx.send(Event::MsgToSubsystem(msg)).await?; + + Ok(()) + } + + /// Inform the `Overseer` that that some block was finalized. + pub async fn block_finalized(&mut self) -> SubsystemResult<()> { + self.events_tx.send(Event::BlockFinalized).await?; + + Ok(()) + } + + /// Tell `Overseer` to shutdown. + pub async fn stop(&mut self) -> SubsystemResult<()> { + self.events_tx.send(Event::Stop).await?; + + Ok(()) + } +} + +impl Debug for ToOverseer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - OverseerMessage::SubsystemMessage { to, msg } => { - write!(f, "OverseerMessage::SubsystemMessage{{ to: {:?}, msg: {:?} }}", to, msg) + ToOverseer::SubsystemMessage(msg) => { + write!(f, "OverseerMessage::SubsystemMessage({:?})", msg) } - OverseerMessage::Spawn{ .. } => write!(f, "OverseerMessage::Spawn(..)") + ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)") } - } + } } -/// A running instance of some `Subsystem`. +/// A running instance of some [`Subsystem`]. +/// +/// [`Subsystem`]: trait.Subsystem.html struct SubsystemInstance { - /// We talk to the `Overseer` over this channel. - rx: mpsc::Receiver>, - /// The `Overseer` talks to use over this channel. - tx: mpsc::Sender, - /// The actual async task running. - f: SubsystemJob, + tx: mpsc::Sender>, } -/// An `id` that is given to any `SubsystemInstance` for identification. -#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] -pub struct SubsystemId(usize); - -/// A context type that is given to the `Subsystem` upon spawning -/// that can be used by `Subsystem` to communicate with the outside world. +/// A context type that is given to the [`Subsystem`] upon spawning. +/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s +/// or to spawn it's [`SubsystemJob`]s. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`Subsystem`]: trait.Subsystem.html +/// [`SubsystemJob`]: trait.SubsystemJob.html pub struct SubsystemContext{ - rx: mpsc::Receiver, - tx: mpsc::Sender>, + rx: mpsc::Receiver>, + tx: mpsc::Sender, +} + +/// A signal used by [`Overseer`] to communicate with the [`Subsystem`]s. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`Subsystem`]: trait.Subsystem.html +#[derive(Debug)] +pub enum OverseerSignal { + /// `Subsystem` should start working. + StartWork, + /// `Subsystem` should stop working. + StopWork, + /// Conclude the work of the `Overseer` and all `Subsystem`s. + Conclude, +} + +#[derive(Debug)] +/// A message type used by the Validation [`Subsystem`]. +/// +/// [`Subsystem`]: trait.Subsystem.html +pub enum ValidationSubsystemMessage { + ValidityAttestation, +} + +#[derive(Debug)] +/// A message type used by the CandidateBacking [`Subsystem`]. +/// +/// [`Subsystem`]: trait.Subsystem.html +pub enum CandidateBackingSubsystemMessage { + RegisterBackingWatcher, + Second, +} + +/// A message type tying together all message types that are used across [`Subsystem`]s. +/// +/// [`Subsystem`]: trait.Subsystem.html +#[derive(Debug)] +pub enum AllMessages { + Validation(ValidationSubsystemMessage), + CandidateBacking(CandidateBackingSubsystemMessage), +} + +/// A message type that a [`Subsystem`] receives from the [`Overseer`]. +/// It wraps siglans from the [`Overseer`] and messages that are circulating +/// between subsystems. +/// +/// It is generic over over the message type `M` that a particular `Subsystem` may use. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`Subsystem`]: trait.Subsystem.html +#[derive(Debug)] +pub enum FromOverseer { + /// Signal from the `Overseer`. + Signal(OverseerSignal), + + /// Some other `Subsystem`'s message. + Communication { + msg: M, + }, } impl SubsystemContext { @@ -145,50 +282,38 @@ impl SubsystemContext { /// /// This has to be used with caution, if you loop over this without /// using `pending!()` macro you will end up with a busy loop! - pub async fn try_recv(&mut self) -> Result, ()> { + pub async fn try_recv(&mut self) -> Result>, ()> { match poll!(self.rx.next()) { - Poll::Ready(Some(msg)) => Ok(Some(msg)), + Poll::Ready(Some(msg)) => Ok(Some(msg)), Poll::Ready(None) => Err(()), - Poll::Pending => Ok(None), + Poll::Pending => Ok(None), } } /// Receive a message. - pub async fn recv(&mut self) -> Result { + pub async fn recv(&mut self) -> SubsystemResult> { self.rx.next().await.ok_or(SubsystemError) } - /// Send a message to whom it may concern. - /// - /// The message will be broadcasted to all other `Subsystem`s that can - /// receive it. - pub async fn send_msg(&mut self, msg: M) { - let _ = self.tx.send(OverseerMessage::SubsystemMessage{ - to: None, - msg, - }).await; - } - - /// Spawn a child `Subsystem` on the executor and get it's `SubsystemId` upon success. - pub async fn spawn(&mut self, s: Box>) -> Result { + /// Spawn a child task on the executor. + pub async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()> { let (tx, rx) = oneshot::channel(); - let _ = self.tx.send(OverseerMessage::Spawn{ + self.tx.send(ToOverseer::SpawnJob { s, res: tx, - }).await; + }).await?; - rx.await.unwrap_or_else(|_| Err(SubsystemError)) + rx.await? } - /// Send a direct message to some other `Subsystem` you know `SubsystemId` of. - pub async fn send_msg_to(&mut self, to: SubsystemId, msg: M) { - let _ = self.tx.send(OverseerMessage::SubsystemMessage{ - to: Some(to), - msg, - }).await; + /// Send a direct message to some other `Subsystem`, routed based on message type. + pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> { + self.tx.send(ToOverseer::SubsystemMessage(msg)).await?; + + Ok(()) } - fn new(rx: mpsc::Receiver, tx: mpsc::Sender>) -> Self { + fn new(rx: mpsc::Receiver>, tx: mpsc::Sender) -> Self { Self { rx, tx, @@ -196,60 +321,82 @@ impl SubsystemContext { } } -/// A trait that describes the `Subsystems` that can run on the `Overseer`. +/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`]. /// /// It is generic over the message type circulating in the system. /// The idea that we want some type contaning persistent state that -/// can start actually running jobs when asked to. +/// can spawn actually running subsystems when asked to. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`Subsystem`]: trait.Subsystem.html pub trait Subsystem { - /// Start this `Subsystem` and return `SubsystemJob`. - fn start(&mut self, rx: mpsc::Receiver, tx: mpsc::Sender>) -> SubsystemJob; - /// If this `Subsystem` want to receive this message. - /// - /// By default receive all messages. - fn can_recv_msg(&self, _msg: &M) -> bool { true } + /// Start this `Subsystem` and return `SpawnedSubsystem`. + fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem; } /// A subsystem that we oversee. /// -/// Ties together the `Subsystem` itself and it's running instance -/// (which may be missing if the `Subsystem` is not running at the moment +/// Ties together the [`Subsystem`] itself and it's running instance +/// (which may be missing if the [`Subsystem`] is not running at the moment /// for whatever reason). -struct OverseedSubsystem { - subsystem: Box>, +/// +/// [`Subsystem`]: trait.Subsystem.html +#[allow(dead_code)] +struct OverseenSubsystem { + subsystem: Box + Send>, instance: Option>, } /// The `Overseer` itself. -pub struct Overseer { - /// All `Subsystem`s by their respective `SubsystemId`s. - subsystems: HashMap>, - /// The actual poor man's process tree. - /// - /// Needed (among other things) to stop a running `Job` along - /// with all it's children. - id_to_children: HashMap>, +pub struct Overseer { + /// A validation subsystem + validation_subsystem: OverseenSubsystem, + + /// A candidate backing subsystem + candidate_backing_subsystem: OverseenSubsystem, + + /// Spawner to spawn tasks to. + s: S, + + /// Here we keep handles to spawned subsystems to be notified when they terminate. + running_subsystems: FuturesUnordered>, + + /// Gather running subsystms' outbound streams into one. + running_subsystems_rx: StreamUnordered>, + + /// Events that are sent to the overseer from the outside world + events_rx: mpsc::Receiver, } -impl Overseer { - /// Create a new intance of the `Overseer` with some initial set of `Subsystems. +impl Overseer +where + S: Spawn, +{ + /// Create a new intance of the `Overseer` with a fixed set of [`Subsystem`]s. /// - /// The `Subsystems` submitted to this call will act as a level 1 in the "process tree": + /// Each [`Subsystem`] is passed to this function as an explicit parameter + /// and is supposed to implement some interface that is generic over message type + /// that is specific to this [`Subsystem`]. At the moment there are only two + /// subsystems: + /// * Validation + /// * CandidateBacking /// + /// As any entity that satisfies the interface may act as a [`Subsystem`] this allows + /// mocking in the test code: /// /// ```text /// +------------------------------------+ /// | Overseer | /// +------------------------------------+ /// / | | \ - /// ................. subsystems[..] .............................. + /// ................. subsystems................................... /// . +-----------+ +-----------+ +----------+ +---------+ . /// . | | | | | | | | . /// . +-----------+ +-----------+ +----------+ +---------+ . /// ............................................................... /// | /// probably `spawn` - /// something else + /// a `job` /// | /// V /// +-----------+ @@ -257,151 +404,247 @@ impl Overseer { /// +-----------+ /// /// ``` - pub fn new(subsystems: Vec>>) -> Self { - let mut this = Self { - subsystems: HashMap::new(), - id_to_children: HashMap::new(), + /// + /// [`Subsystem`]: trait.Subsystem.html + /// + /// # Example + /// + /// The [`Subsystems`] may be any type as long as they implement an expected interface. + /// Here, we create two mock subsystems and start the `Overseer` with them. For the sake + /// of simplicity the termination of the example is done with a timeout. + /// ``` + /// # use std::time::Duration; + /// # use futures::{executor, pin_mut, select, FutureExt}; + /// # use futures_timer::Delay; + /// # use overseer::{ + /// # Overseer, Subsystem, SpawnedSubsystem, SubsystemContext, + /// # ValidationSubsystemMessage, CandidateBackingSubsystemMessage, + /// # }; + /// + /// struct ValidationSubsystem; + /// impl Subsystem for ValidationSubsystem { + /// fn start( + /// &mut self, + /// mut ctx: SubsystemContext, + /// ) -> SpawnedSubsystem { + /// SpawnedSubsystem(Box::pin(async move { + /// loop { + /// Delay::new(Duration::from_secs(1)).await; + /// } + /// })) + /// } + /// } + /// + /// struct CandidateBackingSubsystem; + /// impl Subsystem for CandidateBackingSubsystem { + /// fn start( + /// &mut self, + /// mut ctx: SubsystemContext, + /// ) -> SpawnedSubsystem { + /// SpawnedSubsystem(Box::pin(async move { + /// loop { + /// Delay::new(Duration::from_secs(1)).await; + /// } + /// })) + /// } + /// } + /// + /// # fn main() { executor::block_on(async move { + /// let spawner = executor::ThreadPool::new().unwrap(); + /// let (overseer, _handler) = Overseer::new( + /// Box::new(ValidationSubsystem), + /// Box::new(CandidateBackingSubsystem), + /// spawner, + /// ).unwrap(); + /// + /// let timer = Delay::new(Duration::from_millis(50)).fuse(); + /// + /// let overseer_fut = overseer.run().fuse(); + /// pin_mut!(timer); + /// pin_mut!(overseer_fut); + /// + /// select! { + /// _ = overseer_fut => (), + /// _ = timer => (), + /// } + /// # + /// # }); } + /// ``` + pub fn new( + validation: Box + Send>, + candidate_backing: Box + Send>, + mut s: S, + ) -> SubsystemResult<(Self, OverseerHandler)> { + let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); + + let handler = OverseerHandler { + events_tx: events_tx.clone(), + }; + + let mut running_subsystems_rx = StreamUnordered::new(); + let mut running_subsystems = FuturesUnordered::new(); + + let validation_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + validation, + )?; + + let candidate_backing_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + candidate_backing, + )?; + + let this = Self { + validation_subsystem, + candidate_backing_subsystem, + s, + running_subsystems, + running_subsystems_rx, + events_rx, }; - for s in subsystems.into_iter() { - let _ = this.spawn(s); + Ok((this, handler)) + } + + // Stop the overseer. + async fn stop(mut self) { + if let Some(ref mut s) = self.validation_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.candidate_backing_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } - this + let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse(); + + loop { + select! { + _ = self.running_subsystems.next() => { + if self.running_subsystems.is_empty() { + break; + } + }, + _ = stop_delay => break, + complete => break, + } + } } /// Run the `Overseer`. - // TODO: we have to - // * Give out to the user some handler to communicate with the `Overseer` - // to tell it to do things such as `Start` `Stop` or `Spawn` - // * Actually implement stopping of the `Overseer`, atm it's unstoppable. - pub async fn run(mut self) { + pub async fn run(mut self) -> SubsystemResult<()> { loop { - // Upon iteration of the loop we will be collecting all the messages - // that need dispatching (if any). - let mut msgs = Vec::default(); - - for (id, s) in self.subsystems.iter_mut() { - if let Some(s) = &mut s.instance { - // TODO: set to `None` or restart. - // It will panic when the `Future` becomes - // ready and then it will be polled again here. - if let Poll::Ready(res) = poll!(&mut s.f.0) { - log::info!("subsystem stopped {:?}", res); + while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) { + match msg { + Event::MsgToSubsystem(msg) => { + self.route_message(msg).await; } - - match poll!(&mut s.rx.next()) { - Poll::Ready(Some(msg)) => { - log::info!("Received message from subsystem {:?}", msg); - msgs.push((*id, msg)); - } - _ => {} + Event::Stop => { + self.stop().await; + return Ok(()); } + _ => () } } - // Do the message dispatching be it broadcasting or direct messages. - // - // TODO: this desperately need refactoring. - for msg in msgs.into_iter() { - match msg.1 { - OverseerMessage::SubsystemMessage{ to, msg: m } => { - match to { - Some(to) => { - if let Some(subsystem) = self.subsystems.get_mut(&to) { - if let Some(ref mut i) = subsystem.instance { - let _ = i.tx.send(m).await; - } - } - } - None => { - for (id, s) in self.subsystems.iter_mut() { - // Don't send messages back to the sender. - if msg.0 == *id { - continue; - } - - if s.subsystem.can_recv_msg(&m) { - if let Some(ref mut i) = s.instance { - let _ = i.tx.send(m.clone()).await; - } - } - } - } - } - } - OverseerMessage::Spawn { s, res } => { - log::info!("Spawn message"); - - let s = self.spawn(s); + while let Poll::Ready(Some((StreamYield::Item(msg), _))) = poll!( + &mut self.running_subsystems_rx.next() + ) { + match msg { + ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await, + ToOverseer::SpawnJob { s, res } => { + let s = self.spawn_job(s); - if let Ok(id) = s { - match self.id_to_children.get_mut(&msg.0) { - Some(ref mut v) => { - v.insert(msg.0); - } - None => { - let mut hs = HashSet::new(); - hs.insert(id); - self.id_to_children.insert(msg.0, hs); - } - } - } let _ = res.send(s); } } } - // Log our "poor man's" process tree as it is at the moment. - log::trace!("Pids {:?}", self.id_to_children); + // Some subsystem exited? It's time to panic. + if let Poll::Ready(Some(finished)) = poll!(self.running_subsystems.next()) { + log::error!("Subsystem finished unexpectedly {:?}", finished); + self.stop().await; + return Err(SubsystemError); + } // Looks like nothing is left to be polled, let's take a break. pending!(); } } - fn spawn(&mut self, mut s: Box>) -> Result { - let (to_tx, to_rx) = mpsc::channel(1024); - let (from_tx, from_rx) = mpsc::channel(1024); - let f = s.start(to_rx, from_tx); - - let instance = Some(SubsystemInstance { - rx: from_rx, - tx: to_tx, - f, - }); + async fn route_message(&mut self, msg: AllMessages) { + match msg { + AllMessages::Validation(msg) => { + if let Some(ref mut s) = self.validation_subsystem.instance { + let _= s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::CandidateBacking(msg) => { + if let Some(ref mut s) = self.candidate_backing_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + } + } - let id = SubsystemId(get_id()); - self.subsystems.insert(id, OverseedSubsystem { - subsystem: s, - instance, - }); - Ok(id) + fn spawn_job(&mut self, j: BoxFuture<'static, ()>) -> SubsystemResult<()> { + self.s.spawn(j).map_err(|_| SubsystemError) } } +fn spawn( + spawner: &mut S, + futures: &mut FuturesUnordered>, + streams: &mut StreamUnordered>, + mut s: Box + Send>, +) -> SubsystemResult> { + let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); + let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY); + let ctx = SubsystemContext::new(to_rx, from_tx); + let f = s.start(ctx); + + let handle = spawner.spawn_with_handle(f.0)?; + + streams.push(from_rx); + futures.push(handle); + + let instance = Some(SubsystemInstance { + tx: to_tx, + }); + + Ok(OverseenSubsystem { + subsystem: s, + instance, + }) +} #[cfg(test)] mod tests { use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; use super::*; - // TODO: Can the test types and the tests themselves be simplified - // to avoid all this message collection to compare results to the desired ones? struct TestSubsystem1(mpsc::Sender); - impl Subsystem for TestSubsystem1 { - fn start(&mut self, mut ctx: SubsystemContext) -> SubsystemJob { + impl Subsystem for TestSubsystem1 { + fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { let mut sender = self.0.clone(); - SubsystemJob(Box::pin(async move { + SpawnedSubsystem(Box::pin(async move { + let mut i = 0; loop { match ctx.recv().await { - Ok(msg) => { - let _ = sender.send(msg).await; + Ok(FromOverseer::Communication { .. }) => { + let _ = sender.send(i).await; + i += 1; continue; } - Err(_) => return Ok(()), + Ok(FromOverseer::Signal(OverseerSignal::StopWork)) => return, + Err(_) => return, + _ => (), } } })) @@ -410,20 +653,28 @@ mod tests { struct TestSubsystem2(mpsc::Sender); - impl Subsystem for TestSubsystem2 { - fn start(&mut self, mut ctx: SubsystemContext) -> SubsystemJob { - SubsystemJob(Box::pin(async move { - let mut c = 0; + impl Subsystem for TestSubsystem2 { + fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(async move { + let mut c: usize = 0; loop { if c < 10 { - ctx.send_msg(c).await; + ctx.send_msg( + AllMessages::Validation( + ValidationSubsystemMessage::ValidityAttestation + ) + ).await.unwrap(); c += 1; + continue; } match ctx.try_recv().await { + Ok(Some(FromOverseer::Signal(OverseerSignal::StopWork))) => { + break; + } Ok(Some(_)) => { continue; } - Err(_) => return Ok(()), + Err(_) => return, _ => (), } pending!(); @@ -432,62 +683,30 @@ mod tests { } } - struct TestSubsystem3(Option>); - - impl Subsystem for TestSubsystem3 { - fn start(&mut self, mut ctx: SubsystemContext) -> SubsystemJob { - let oneshot = self.0.take().unwrap(); - - SubsystemJob(Box::pin(async move { - let (tx, mut rx) = mpsc::channel(1024); - - let s1 = Box::new(TestSubsystem1(tx)); - - let s1_id = ctx.spawn(s1).await.unwrap(); - - let mut c = 0; - loop { - if c < 10 { - ctx.send_msg_to(s1_id, c).await; - assert_eq!(rx.next().await, Some(c)); - c += 1; - continue; - } - break; - } - - let _ = oneshot.send(c); + struct TestSubsystem4; - // just stay around for longer - loop { - match ctx.try_recv().await { - Ok(Some(_)) => { - continue; - } - Err(_) => return Ok(()), - _ => (), - } - pending!(); - } + impl Subsystem for TestSubsystem4 { + fn start(&mut self, mut _ctx: SubsystemContext) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(async move { + // Do nothing and exit. })) } } // Checks that a minimal configuration of two jobs can run and exchange messages. - // The first job a number of messages that are re-broadcasted to the second job that - // in it's turn send them to the test code to collect the results and compare them to - // the expected ones. - #[test] + #[test] fn overseer_works() { - executor::block_on(async { + let spawner = executor::ThreadPool::new().unwrap(); + + executor::block_on(async move { let (s1_tx, mut s1_rx) = mpsc::channel(64); let (s2_tx, mut s2_rx) = mpsc::channel(64); - let subsystems: Vec>> = vec![ + let (overseer, mut handler) = Overseer::new( Box::new(TestSubsystem1(s1_tx)), Box::new(TestSubsystem2(s2_tx)), - ]; - let overseer = Overseer::new(subsystems); + spawner, + ).unwrap(); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -503,7 +722,7 @@ mod tests { Some(msg) => { s1_results.push(msg); if s1_results.len() == 10 { - break; + handler.stop().await.unwrap(); } } None => break, @@ -523,29 +742,27 @@ mod tests { }); } - // Test that spawning a subsystem and sending it a direct message works + // Spawn a subsystem that immediately exits. + // + // Should immediately conclude the overseer itself with an error. #[test] - fn overseer_spawn_works() { - executor::block_on(async { - let (tx, rx) = oneshot::channel(); - let subsystems: Vec>> = vec![ - Box::new(TestSubsystem3(Some(tx))), - ]; - let overseer = Overseer::new(subsystems); - let overseer_fut = overseer.run().fuse(); + fn overseer_panics_on_sybsystem_exit() { + let spawner = executor::ThreadPool::new().unwrap(); - let mut rx = rx.fuse(); + executor::block_on(async move { + let (s1_tx, _) = mpsc::channel(64); + let (overseer, _handle) = Overseer::new( + Box::new(TestSubsystem1(s1_tx)), + Box::new(TestSubsystem4), + spawner, + ).unwrap(); + let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); - loop { - select! { - a = overseer_fut => break, - result = rx => { - assert_eq!(result.unwrap(), 10); - break; - } - } + select! { + res = overseer_fut => assert!(res.is_err()), + complete => (), } - }); + }) } }