Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Make validation work on wasm!
Browse files Browse the repository at this point in the history
  • Loading branch information
expenses committed Nov 29, 2019
1 parent b45a95c commit cceb6b7
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 76 deletions.
11 changes: 5 additions & 6 deletions validation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"

[dependencies]
futures = "0.1.17"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
futures01 = { package = "futures", version = "0.1.17" }
futures = { version = "0.3.1", features = ["compat"] }
futures-timer = "2.0"
async-std = { version = "1.0.1", features = ["unstable"] }

parking_lot = "0.9.0"
tokio = "0.1.22"
tokio = { version = "0.2.1", features = ["rt-core", "blocking"] }
derive_more = "0.14.1"
log = "0.4.8"
exit-future = "0.1.4"
tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] }
exit-future = { git = "https://github.com/expenses/exit-future", branch = "modernize" }
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
availability_store = { package = "polkadot-availability-store", path = "../availability-store" }
parachain = { package = "polkadot-parachain", path = "../parachain" }
Expand Down
74 changes: 30 additions & 44 deletions validation/src/attestation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,28 @@
/// such as candidate verification while performing event-driven work
/// on a local event loop.
use std::{thread, time::{Duration, Instant}, sync::Arc};
use std::{thread, time::Duration, sync::Arc};

use client::{BlockchainEvents, BlockBody};
use sp_blockchain::{HeaderBackend, Result as ClientResult};
use block_builder::BlockBuilderApi;
use consensus::SelectChain;
use availability_store::Store as AvailabilityStore;
use futures::prelude::*;
use futures03::{TryStreamExt as _, StreamExt as _};
use log::error;
use futures01::prelude::*;
use futures::{StreamExt, FutureExt, Future, future::{ready, select}};
use polkadot_primitives::{Block, BlockId};
use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost};
use runtime_primitives::traits::{ProvideRuntimeApi};
use babe_primitives::BabeApi;
use keystore::KeyStorePtr;
use sr_api::ApiExt;

use tokio::{timer::Interval, runtime::current_thread::Runtime as LocalRuntime};
use log::{warn, debug};
use tokio::{runtime::Runtime as LocalRuntime};
use log::warn;

use super::{Network, Collators};

type TaskExecutor = Arc<dyn futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;
type TaskExecutor = futures::executor::ThreadPool;

/// Gets a list of the candidates in a block.
pub(crate) fn fetch_candidates<P: BlockBody<Block>>(client: &P, block: &BlockId)
Expand Down Expand Up @@ -75,31 +74,30 @@ pub(crate) fn fetch_candidates<P: BlockBody<Block>>(client: &P, block: &BlockId)
// NOTE: this will need to be changed to finality notification rather than
// block import notifications when the consensus switches to non-instant finality.
fn prune_unneeded_availability<P>(client: Arc<P>, availability_store: AvailabilityStore)
-> impl Future<Item=(),Error=()> + Send
-> impl Future<Output=()> + Send + Unpin
where P: Send + Sync + BlockchainEvents<Block> + BlockBody<Block> + 'static
{
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |notification| {
let hash = notification.hash;
let parent_hash = notification.header.parent_hash;
let candidate_hashes = match fetch_candidates(&*client, &BlockId::hash(hash)) {
Ok(Some(candidates)) => candidates.map(|c| c.hash()).collect(),
Ok(None) => {
warn!("Could not extract candidates from block body of imported block {:?}", hash);
return Ok(())
return ready(())
}
Err(e) => {
warn!("Failed to fetch block body for imported block {:?}: {:?}", hash, e);
return Ok(())
return ready(())
}
};

if let Err(e) = availability_store.candidates_finalized(parent_hash, candidate_hashes) {
warn!(target: "validation", "Failed to prune unneeded available data: {:?}", e);
}

Ok(())
ready(())
})
}

Expand Down Expand Up @@ -133,7 +131,6 @@ pub(crate) fn start<C, N, P, SC>(
<N::BuildTableRouter as IntoFuture>::Future: Send + 'static,
SC: SelectChain<Block> + 'static,
{
const TIMER_DELAY: Duration = Duration::from_secs(5);
const TIMER_INTERVAL: Duration = Duration::from_secs(30);

let (signal, exit) = ::exit_future::signal();
Expand All @@ -145,8 +142,7 @@ pub(crate) fn start<C, N, P, SC>(

let keystore = keystore.clone();

client.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
let notifications = client.import_notification_stream()
.for_each(move |notification| {
let parent_hash = notification.hash;
if notification.is_new_best {
Expand All @@ -163,52 +159,42 @@ pub(crate) fn start<C, N, P, SC>(
);
}
}
Ok(())
})
.select(exit.clone())
.then(|_| Ok(()))
ready(())
});

select(notifications, exit.clone())
};

let prune_old_sessions = {
let select_chain = select_chain.clone();
let interval = Interval::new(
Instant::now() + TIMER_DELAY,
TIMER_INTERVAL,
);

interval
let interval = crate::interval(TIMER_INTERVAL)
.for_each(move |_| match select_chain.leaves() {
Ok(leaves) => {
parachain_validation.retain(|h| leaves.contains(h));
Ok(())
ready(())
}
Err(e) => {
warn!("Error fetching leaves from client: {:?}", e);
Ok(())
ready(())
}
})
.map_err(|e| warn!("Timer error {:?}", e))
.select(exit.clone())
.then(|_| Ok(()))
});

select(interval, exit.clone()).map(|_| ())
};

runtime.spawn(notifications);
if let Err(_) = thread_pool.execute(Box::new(prune_old_sessions)) {
error!("Failed to spawn old sessions pruning task");
}
thread_pool.spawn_ok(prune_old_sessions);

let prune_available = prune_unneeded_availability(client, availability_store)
.select(exit.clone())
.then(|_| Ok(()));
let prune_available = futures::future::select(
prune_unneeded_availability(client, availability_store),
exit.clone()
)
.map(|_| ());

// spawn this on the tokio executor since it's fine on a thread pool.
if let Err(_) = thread_pool.execute(Box::new(prune_available)) {
error!("Failed to spawn available pruning task");
}
thread_pool.spawn_ok(prune_available);

if let Err(e) = runtime.block_on(exit) {
debug!("BFT event loop error {:?}", e);
}
runtime.block_on(exit);
});

ServiceHandle {
Expand All @@ -220,7 +206,7 @@ pub(crate) fn start<C, N, P, SC>(
impl Drop for ServiceHandle {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
signal.fire();
let _ = signal.fire();
}

if let Some(thread) = self.thread.take() {
Expand Down
4 changes: 2 additions & 2 deletions validation/src/collation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use polkadot_primitives::{Block, Hash, BlockId, Balance, parachain::{
use runtime_primitives::traits::ProvideRuntimeApi;
use parachain::{wasm_executor::{self, ExternalitiesError, ExecutionMode}, MessageRef, UpwardMessageRef};
use trie::TrieConfiguration;
use futures::prelude::*;
use futures01::prelude::*;
use log::debug;

/// Encapsulates connections to collators and allows collation on any parachain.
Expand Down Expand Up @@ -112,7 +112,7 @@ impl<C: Collators, P: ProvideRuntimeApi> Future for CollationFetch<C, P>
.get_or_insert_with(move || c.collate(parachain, r).into_future())
.poll();

futures::try_ready!(poll)
futures01::try_ready!(poll)
};

let res = validate_collation(
Expand Down
3 changes: 2 additions & 1 deletion validation/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ pub enum Error {
DeadlineComputeFailure(std::time::Duration),
/// Unable to dispatch agreement future
#[display(fmt = "Unable to dispatch agreement future: {:?}", _0)]
Executor(futures::future::ExecuteErrorKind),
Executor(futures01::future::ExecuteErrorKind),
Join(tokio::task::JoinError)
}

impl std::error::Error for Error {
Expand Down
55 changes: 37 additions & 18 deletions validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,29 @@ use polkadot_primitives::parachain::{
use primitives::Pair;
use runtime_primitives::traits::{ProvideRuntimeApi, DigestFor};
use futures_timer::Delay;
use async_std::stream::{interval, Interval};
use txpool_api::{TransactionPool, InPoolTransaction};

use attestation_service::ServiceHandle;
use futures::prelude::*;
use futures03::{future::{self, Either}, FutureExt, StreamExt};
use futures01::prelude::*;
use futures::{
future::{self, Either, select}, FutureExt, StreamExt, compat::Future01CompatExt, Stream,
stream::unfold
};
use collation::CollationFetch;
use dynamic_inclusion::DynamicInclusion;
use inherents::InherentData;
use sp_timestamp::TimestampInherentData;
use log::{info, debug, warn, trace, error};
use log::{info, debug, warn, trace};
use keystore::KeyStorePtr;
use sr_api::ApiExt;

type TaskExecutor = Arc<dyn futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;
fn interval(duration: Duration) -> impl Stream<Item=()> + Send + Unpin {
unfold((), move |_| {
futures_timer::Delay::new(duration).map(|_| Some(((), ())))
}).map(drop)
}

type TaskExecutor = futures::executor::ThreadPool;

pub use self::collation::{
validate_collation, validate_incoming, message_queue_root, egress_roots, Collators,
Expand All @@ -79,6 +87,8 @@ pub use self::shared_table::{
SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement,
GenericStatement,
};

#[cfg(not(target_os = "unknown"))]
pub use parachain::wasm_executor::{run_worker as run_validation_worker};

mod attestation_service;
Expand Down Expand Up @@ -402,19 +412,18 @@ impl<C, N, P> ParachainValidation<C, N, P> where
})
};

let cancellable_work = build_router
let router = build_router
.into_future()
.map_err(|e| {
warn!(target: "validation" , "Failed to build table router: {:?}", e);
})
.and_then(with_router)
.select(exit)
.then(|_| Ok(()));
.compat();

let cancellable_work = select(exit, router).map(|_| ());

// spawn onto thread pool.
if self.handle.execute(Box::new(cancellable_work)).is_err() {
error!("Failed to spawn cancellable work task");
}
self.handle.spawn_ok(cancellable_work)
}
}

Expand Down Expand Up @@ -601,7 +610,7 @@ impl<C, TxPool> consensus::Proposer<Block> for Proposer<C, TxPool> where

let timing = ProposalTiming {
minimum: delay_future,
attempt_propose: interval(ATTEMPT_PROPOSE_EVERY),
attempt_propose: Box::new(interval(ATTEMPT_PROPOSE_EVERY)),
enough_candidates: Delay::new(enough_candidates),
dynamic_inclusion,
last_included: initial_included,
Expand Down Expand Up @@ -642,7 +651,7 @@ fn current_timestamp() -> u64 {

struct ProposalTiming {
minimum: Option<Delay>,
attempt_propose: Interval,
attempt_propose: Box<dyn futures::Stream<Item=()> + Send + Unpin>,
dynamic_inclusion: DynamicInclusion,
enough_candidates: Delay,
last_included: usize,
Expand Down Expand Up @@ -698,7 +707,7 @@ enum CreateProposalState<C: Send + Sync, TxPool> {
/// Represents the state when we switch from pending to fired.
Switching,
/// Block proposing has fired.
Fired(tokio_executor::blocking::Blocking<Result<Block, Error>>),
Fired(tokio::task::JoinHandle<Result<Block, Error>>),
}

/// Inner data of the create proposal.
Expand Down Expand Up @@ -810,7 +819,7 @@ impl<C, TxPool> CreateProposalData<C, TxPool> where
}
}

impl<C, TxPool> futures03::Future for CreateProposal<C, TxPool> where
impl<C, TxPool> futures::Future for CreateProposal<C, TxPool> where
TxPool: TransactionPool<Block=Block> + 'static,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Expand Down Expand Up @@ -844,18 +853,28 @@ impl<C, TxPool> futures03::Future for CreateProposal<C, TxPool> where
thus Switching will never be reachable here; qed"
),
CreateProposalState::Fired(mut future) => {
let ret = Pin::new(&mut future).poll(cx);
let ret = match Pin::new(&mut future).poll(cx) {
Poll::Ready(res) => {
Poll::Ready(res.map_err(Error::Join).and_then(|res| res))
},
Poll::Pending => Poll::Pending
};
self.state = CreateProposalState::Fired(future);
return ret
},
};

// 2. propose
let mut future = tokio_executor::blocking::run(move || {
let mut future = tokio::task::spawn_blocking(move || {
let proposed_candidates = data.table.proposed_set();
data.propose_with(proposed_candidates)
});
let polled = Pin::new(&mut future).poll(cx);
let polled = match Pin::new(&mut future).poll(cx) {
Poll::Ready(res) => {
Poll::Ready(res.map_err(Error::Join).and_then(|res| res))
},
Poll::Pending => Poll::Pending
};
self.state = CreateProposalState::Fired(future);

polled
Expand Down
4 changes: 2 additions & 2 deletions validation/src/shared_table/includable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use std::collections::HashMap;

use futures::prelude::*;
use futures::sync::oneshot;
use futures01::prelude::*;
use futures01::sync::oneshot;

use polkadot_primitives::Hash;

Expand Down
6 changes: 3 additions & 3 deletions validation/src/shared_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use polkadot_primitives::parachain::{
};

use parking_lot::Mutex;
use futures::prelude::*;
use futures01::prelude::*;
use log::{warn, debug};
use bitvec::bitvec;

Expand Down Expand Up @@ -331,7 +331,7 @@ impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F>
let work = &mut self.inner.work;
let candidate = &work.candidate_receipt;

let pov_block = futures::try_ready!(work.fetch.poll());
let pov_block = futures01::try_ready!(work.fetch.poll());
let validation_res = (self.validate)(
&BlockId::hash(self.inner.relay_parent),
&Collation { pov: pov_block.clone(), receipt: candidate.clone() },
Expand Down Expand Up @@ -574,7 +574,7 @@ mod tests {
use substrate_keyring::Sr25519Keyring;
use primitives::crypto::UncheckedInto;
use polkadot_primitives::parachain::{BlockData, ConsolidatedIngress};
use futures::future;
use futures01::future;

fn pov_block_with_data(data: Vec<u8>) -> PoVBlock {
PoVBlock {
Expand Down

0 comments on commit cceb6b7

Please sign in to comment.