Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

implement bitfield signing subsystem #1364

Merged
merged 17 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
start rewriting bitfield signing in terms of the util module
  • Loading branch information
coriolinus committed Jul 22, 2020
commit a8d312cf6fe7663e6afe2d1af0e9fb9c7a97985d
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions node/bitfield-signing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ authors = ["Peter Goodspeed-Niklaus <peter.r.goodspeedniklaus@gmail.com>"]
edition = "2018"

[dependencies]
bitvec = "0.17.4"
derive_more = "0.99.9"
futures = "0.3.5"
log = "0.4.8"
polkadot-primitives = { path = "../../primitives" }
polkadot-node-subsystem = { path = "../subsystem" }
keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" }
wasm-timer = "0.2.4"
220 changes: 131 additions & 89 deletions node/bitfield-signing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,121 +16,163 @@

//! The bitfield signing subsystem produces `SignedAvailabilityBitfield`s once per block.

use bitvec::bitvec;
use futures::{
channel::{mpsc, oneshot},
future::{abortable, AbortHandle},
prelude::*,
Future,
};
use keystore::KeyStorePtr;
use polkadot_node_subsystem::{
messages::{AllMessages, BitfieldSigningMessage},
OverseerSignal, SubsystemResult,
messages::{self, AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage, BitfieldSigningMessage, CandidateBackingMessage},
util::{self, JobManager, JobTrait, ToJobTrait, Validator},
};
use polkadot_primitives::v1::{
BackedCandidate, Hash,
};
use polkadot_node_subsystem::{FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext};
use polkadot_primitives::Hash;
use std::{
collections::HashMap,
convert::TryFrom,
pin::Pin,
time::{Duration, Instant},
time::Duration,
};
use wasm_timer::{Delay, Instant};

/// Delay between starting a bitfield signing job and its attempting to create a bitfield.
const JOB_DELAY: Duration = Duration::from_millis(1500);

/// JobCanceler aborts all abort handles on drop.
#[derive(Debug, Default)]
struct JobCanceler(HashMap<Hash, AbortHandle>);
/// Each `BitfieldSigningJob` prepares a signed bitfield for a single relay parent.
pub struct BitfieldSigningJob;

/// Messages which a `BitfieldSigningJob` is prepared to receive.
pub enum ToJob {
BitfieldSigning(BitfieldSigningMessage),
Stop,
}

impl ToJobTrait for ToJob {
const STOP: Self = ToJob::Stop;

// AbortHandle doesn't impl Drop on its own, so we wrap it
// in this struct to get free cancellation on drop.
impl Drop for JobCanceler {
fn drop(&mut self) {
for abort_handle in self.0.values() {
abort_handle.abort();
fn relay_parent(&self) -> Option<Hash> {
match self {
Self::BitfieldSigning(bsm) => bsm.relay_parent(),
Self::Stop => None,
}
}
}

/// Bitfield signing subsystem.
struct BitfieldSigning;

impl BitfieldSigning {
async fn run<Context>(mut ctx: Context) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldSigningMessage> + Clone,
{
let mut active_jobs = JobCanceler::default();

loop {
use FromOverseer::*;
use OverseerSignal::*;
match ctx.recv().await {
Ok(Communication { msg: _ }) => {
unreachable!("BitfieldSigningMessage is uninstantiable; qed")
}
Ok(Signal(StartWork(hash))) => {
let (future, abort_handle) =
abortable(bitfield_signing_job(hash.clone(), ctx.clone()));
// future currently returns a Result based on whether or not it was aborted;
// let's ignore all that and return () unconditionally, to fit the interface.
let future = async move {
let _ = future.await;
};
active_jobs.0.insert(hash.clone(), abort_handle);
ctx.spawn(Box::pin(future)).await?;
}
Ok(Signal(StopWork(hash))) => {
if let Some(abort_handle) = active_jobs.0.remove(&hash) {
abort_handle.abort();
}
}
Ok(Signal(Conclude)) => break,
Err(err) => {
return Err(err);
}
}
impl TryFrom<AllMessages> for ToJob {
type Error = ();

fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::BitfieldSigning(bsm) => Ok(ToJob::BitfieldSigning(bsm)),
_ => Err(())
}
}
}

Ok(())
impl From<BitfieldSigningMessage> for ToJob {
fn from(bsm: BitfieldSigningMessage) -> ToJob {
ToJob::BitfieldSigning(bsm)
}
}

impl<Context> Subsystem<Context> for BitfieldSigning
where
Context: SubsystemContext<Message = BitfieldSigningMessage> + Clone,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
if let Err(err) = Self::run(ctx).await {
log::error!("{:?}", err);
};
}))
/// Messages which may be sent from a `BitfieldSigningJob`.
pub enum FromJob {
AvailabilityStore(AvailabilityStoreMessage),
BitfieldDistribution(BitfieldDistributionMessage),
CandidateBacking(CandidateBackingMessage),
}

impl From<FromJob> for AllMessages {
fn from(from_job: FromJob) -> AllMessages {
match from_job {
FromJob::AvailabilityStore(asm) => AllMessages::AvailabilityStore(asm),
FromJob::BitfieldDistribution(bdm) => AllMessages::BitfieldDistribution(bdm),
FromJob::CandidateBacking(cbm) => AllMessages::CandidateBacking(cbm),
}
}
}

async fn bitfield_signing_job<Context>(hash: Hash, ctx: Context)
where
Context: SubsystemContext<Message = BitfieldSigningMessage>,
{
// first up, figure out when we need to wait until
let delay = wasm_timer::Delay::new_at(Instant::now() + JOB_DELAY);
// next, do some prerequisite work
todo!();
// now, wait for the delay to be complete
if let Err(_) = delay.await {
return;
impl TryFrom<AllMessages> for FromJob {
type Error = ();

fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::AvailabilityStore(asm) => Ok(Self::AvailabilityStore(asm)),
AllMessages::BitfieldDistribution(bdm) => Ok(Self::BitfieldDistribution(bdm)),
AllMessages::CandidateBacking(cbm) => Ok(Self::CandidateBacking(cbm)),
_ => Err(()),
}
}
// let (tx, _) = oneshot::channel();

// ctx.send_message(AllMessages::CandidateValidation(
// CandidateValidationMessage::Validate(
// Default::default(),
// Default::default(),
// PoVBlock {
// block_data: BlockData(Vec::new()),
// },
// tx,
// )
// )).await.unwrap();
unimplemented!()
}

/// Errors we may encounter in the course of executing the `BitfieldSigningSubsystem`.
#[derive(Debug, derive_more::From)]
pub enum Error {
/// error propagated from the utility subsystem
#[from]
Util(util::Error),
/// io error
#[from]
Io(std::io::Error),
/// a one shot channel was canceled
#[from]
Oneshot(oneshot::Canceled),
/// a mspc channel failed to send
#[from]
MpscSend(mpsc::SendError),
}

async fn get_backed_candidates_pending_availability(relay_parent: Hash, sender: &mut mpsc::Sender<FromJob>) -> Result<Vec<BackedCandidate>, Error> {
use FromJob::CandidateBacking;
use CandidateBackingMessage::GetBackedCandidates;
use messages::NewBackedCandidate;

let (tx, rx) = oneshot::channel();
// REVIEW: is this where we should be getting the set of backed candidates from?
sender.send(CandidateBacking(GetBackedCandidates(relay_parent, tx))).await?;
Ok(rx.await?.into_iter().map(|NewBackedCandidate(backed_candidate)| backed_candidate).collect())
}

impl JobTrait for BitfieldSigningJob {
type ToJob = ToJob;
type FromJob = FromJob;
type Error = Error;
type RunArgs = KeyStorePtr;

const NAME: &'static str = "BitfieldSigningJob";

/// Run a job for the parent block indicated
fn run(
parent: Hash,
keystore: Self::RunArgs,
_receiver: mpsc::Receiver<ToJob>,
mut sender: mpsc::Sender<FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
// figure out when to wait to
let wait_until = Instant::now() + JOB_DELAY;

// now do all the work we can before we need to wait for the availability store
// if we're not a validator, we can just succeed effortlessly
let validator = match Validator::new(parent, keystore, sender.clone()).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
};

// wait a bit before doing anything else
Delay::new_at(wait_until).await?;

let backed_candidates = get_backed_candidates_pending_availability(parent, &mut sender).await?;
let bitvec_size: usize = todo!();
let mut out = bitvec![0; bitvec_size];

unimplemented!()
}.boxed()
}
}

/// BitfieldSigningSubsystem manages a number of bitfield signing jobs.
pub type BitfieldSigningSubsystem<Spawner, Context> = JobManager<Spawner, Context, BitfieldSigningJob>;