Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Forked at: bf7ccb8
Parent branch: origin/master
  • Loading branch information
cecton committed Aug 21, 2020
1 parent b7a97df commit 8a5aa08
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 38 deletions.
8 changes: 5 additions & 3 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ use polkadot_primitives::v1::{
ValidationCode, PoV, CandidateDescriptor, ValidationData, PersistedValidationData,
TransientValidationData, OccupiedCoreAssumption, Hash,
};
use polkadot_parachain::wasm_executor::{self, ValidationPool, ExecutionMode, ValidationError,
InvalidCandidate as WasmInvalidCandidate};
use polkadot_parachain::wasm_executor::{
self, ValidationPool, ExecutionMode, ValidationError,
InvalidCandidate as WasmInvalidCandidate, ValidationExecutionMode,
};
use polkadot_parachain::primitives::{ValidationResult as WasmValidationResult, ValidationParams};

use parity_scale_codec::Encode;
Expand Down Expand Up @@ -128,7 +130,7 @@ async fn run(
)
-> SubsystemResult<()>
{
let pool = ValidationPool::new(false);
let pool = ValidationPool::new(ValidationExecutionMode::Local);

loop {
match ctx.recv().await? {
Expand Down
2 changes: 1 addition & 1 deletion parachain/src/wasm_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use sp_externalities::Extensions;
use sp_wasm_interface::HostFunctions as _;

#[cfg(not(any(target_os = "android", target_os = "unknown")))]
pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC};
pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode};

mod validation_host;

Expand Down
51 changes: 36 additions & 15 deletions parachain/src/wasm_executor/validation_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#![cfg(not(any(target_os = "android", target_os = "unknown")))]

use std::{process, env, sync::Arc, sync::atomic};
use std::{process, env, sync::Arc, sync::atomic, path::PathBuf};
use codec::{Decode, Encode};
use crate::primitives::{ValidationParams, ValidationResult};
use super::{
Expand All @@ -29,7 +29,6 @@ use log::{debug, trace};
use futures::executor::ThreadPool;
use sp_core::traits::SpawnNamed;

const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
/// CLI Argument to start in validation worker mode.
const WORKER_ARG: &'static str = "validation-worker";
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
Expand Down Expand Up @@ -66,21 +65,38 @@ impl SpawnNamed for TaskExecutor {
}
}

/// The execution mode for the `ValidationPool`.
#[derive(Debug, Clone)]
pub enum ValidationExecutionMode {
/// The validation worker is ran using the process' executable and the subcommand `validation-worker` is passed
/// following by the address of the shared memory.
Local,
/// The validation worker is ran using the command provided and the argument provided. The address of the shared
/// memory is added at the end of the arguments.
Remote {
/// Path to the validation worker. The file must exists and be executable.
binary: PathBuf,
/// List of arguments passed to the validation worker. The address of the shared memory will be automatically
/// added after the arguments.
args: Vec<String>,
},
}

/// A pool of hosts.
#[derive(Clone)]
pub struct ValidationPool {
hosts: Arc<Vec<Mutex<ValidationHost>>>,
test_mode: bool,
execution_mode: ValidationExecutionMode,
}

const DEFAULT_NUM_HOSTS: usize = 8;

impl ValidationPool {
/// Creates a validation pool with the default configuration.
pub fn new(test_mode: bool) -> ValidationPool {
pub fn new(execution_mode: ValidationExecutionMode) -> ValidationPool {
ValidationPool {
hosts: Arc::new((0..DEFAULT_NUM_HOSTS).map(|_| Default::default()).collect()),
test_mode,
execution_mode,
}
}

Expand All @@ -95,12 +111,12 @@ impl ValidationPool {
) -> Result<ValidationResult, ValidationError> {
for host in self.hosts.iter() {
if let Some(mut host) = host.try_lock() {
return host.validate_candidate(validation_code, params, self.test_mode);
return host.validate_candidate(validation_code, params, self.execution_mode.clone());
}
}

// all workers are busy, just wait for the first one
self.hosts[0].lock().validate_candidate(validation_code, params, self.test_mode)
self.hosts[0].lock().validate_candidate(validation_code, params, self.execution_mode.clone())
}
}

Expand Down Expand Up @@ -234,7 +250,7 @@ impl ValidationHost {
Ok(mem_config.create()?)
}

fn start_worker(&mut self, test_mode: bool) -> Result<(), InternalError> {
fn start_worker(&mut self, execution_mode: ValidationExecutionMode) -> Result<(), InternalError> {
if let Some(ref mut worker) = self.worker {
// Check if still alive
if let Ok(None) = worker.try_wait() {
Expand All @@ -243,12 +259,17 @@ impl ValidationHost {
}
}
let memory = Self::create_memory()?;
let self_path = env::current_exe()?;
debug!("Starting worker at {:?}", self_path);
let mut args = if test_mode { WORKER_ARGS_TEST.to_vec() } else { WORKER_ARGS.to_vec() };
args.push(memory.get_os_path());
let worker = process::Command::new(self_path)
let (cmd, args) = match execution_mode {
ValidationExecutionMode::Local => (
env::current_exe()?,
WORKER_ARGS.iter().map(|x| x.to_string()).collect()
),
ValidationExecutionMode::Remote { binary, args } => (binary, args),
};
debug!("Starting worker at {:?} with arguments: {:?} and {:?}", cmd, args, memory.get_os_path());
let worker = process::Command::new(cmd)
.args(args)
.arg(memory.get_os_path())
.stdin(process::Stdio::piped())
.spawn()?;
self.id = worker.id();
Expand All @@ -269,13 +290,13 @@ impl ValidationHost {
&mut self,
validation_code: &[u8],
params: ValidationParams,
test_mode: bool,
execution_mode: ValidationExecutionMode,
) -> Result<ValidationResult, ValidationError> {
if validation_code.len() > MAX_CODE_MEM {
return Err(ValidationError::InvalidCandidate(InvalidCandidate::CodeTooLarge(validation_code.len())));
}
// First, check if need to spawn the child process
self.start_worker(test_mode)?;
self.start_worker(execution_mode)?;
let memory = self.memory.as_mut()
.expect("memory is always `Some` after `start_worker` completes successfully");
{
Expand Down
30 changes: 22 additions & 8 deletions parachain/test-parachains/tests/adder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@

//! Basic parachain that adds a number as part of its state.

use parachain::primitives::{
RelayChainBlockNumber,
BlockData as GenericBlockData,
HeadData as GenericHeadData,
ValidationParams,
const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];

use parachain::{
primitives::{
RelayChainBlockNumber,
BlockData as GenericBlockData,
HeadData as GenericHeadData,
ValidationParams,
},
wasm_executor::{ValidationPool, ValidationExecutionMode}
};
use codec::{Decode, Encode};

Expand Down Expand Up @@ -52,6 +57,15 @@ fn hash_head(head: &HeadData) -> [u8; 32] {
tiny_keccak::keccak256(head.encode().as_slice())
}

fn validation_pool() -> ValidationPool {
let execution_mode = ValidationExecutionMode::Remote {
binary: std::env::current_exe().unwrap(),
args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(),
};

ValidationPool::new(execution_mode)
}

#[test]
pub fn execute_good_on_parent() {
let parent_head = HeadData {
Expand All @@ -65,7 +79,7 @@ pub fn execute_good_on_parent() {
add: 512,
};

let pool = parachain::wasm_executor::ValidationPool::new(true);
let pool = validation_pool();

let ret = parachain::wasm_executor::validate_candidate(
adder::wasm_binary_unwrap(),
Expand All @@ -91,7 +105,7 @@ fn execute_good_chain_on_parent() {
let mut number = 0;
let mut parent_hash = [0; 32];
let mut last_state = 0;
let pool = parachain::wasm_executor::ValidationPool::new(true);
let pool = validation_pool();

for add in 0..10 {
let parent_head = HeadData {
Expand Down Expand Up @@ -131,7 +145,7 @@ fn execute_good_chain_on_parent() {

#[test]
fn execute_bad_on_parent() {
let pool = parachain::wasm_executor::ValidationPool::new(true);
let pool = validation_pool();

let parent_head = HeadData {
number: 0,
Expand Down
17 changes: 14 additions & 3 deletions parachain/test-parachains/tests/wasm_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,26 @@

//! Basic parachain that adds a number as part of its state.

const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];

use crate::adder;
use parachain::{
primitives::{BlockData, ValidationParams},
wasm_executor::{ValidationError, InvalidCandidate, EXECUTION_TIMEOUT_SEC},
wasm_executor::{ValidationError, InvalidCandidate, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode, ValidationPool},
};

fn validation_pool() -> ValidationPool {
let execution_mode = ValidationExecutionMode::Remote {
binary: std::env::current_exe().unwrap(),
args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(),
};

ValidationPool::new(execution_mode)
}

#[test]
fn terminates_on_timeout() {
let pool = parachain::wasm_executor::ValidationPool::new(true);
let pool = validation_pool();

let result = parachain::wasm_executor::validate_candidate(
halt::wasm_binary_unwrap(),
Expand All @@ -48,7 +59,7 @@ fn terminates_on_timeout() {

#[test]
fn parallel_execution() {
let pool = parachain::wasm_executor::ValidationPool::new(true);
let pool = validation_pool();

let start = std::time::Instant::now();

Expand Down
15 changes: 13 additions & 2 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ pub use polkadot_primitives::v0::{Block, CollatorId, ParachainHost};
pub use sp_runtime::traits::{Block as BlockT, self as runtime_traits, BlakeTwo256};
pub use chain_spec::{PolkadotChainSpec, KusamaChainSpec, WestendChainSpec};
#[cfg(feature = "full-node")]
pub use consensus::run_validation_worker;
pub use consensus::{run_validation_worker, pipeline::{ValidationPool, ValidationExecutionMode}};
pub use codec::Codec;
pub use polkadot_runtime;
pub use kusama_runtime;
pub use westend_runtime;
pub use self::client::*;

const VALIDATION_WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];

native_executor_instance!(
pub PolkadotExecutor,
polkadot_runtime::api::dispatch,
Expand Down Expand Up @@ -391,6 +393,15 @@ pub fn new_full<RuntimeApi, Executor>(

polkadot_network_service.register_availability_store(availability_store.clone());

let execution_mode = if test_mode {
ValidationExecutionMode::Remote {
binary: std::env::current_exe().unwrap(),
args: VALIDATION_WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(),
}
} else {
ValidationExecutionMode::Local
};

let (validation_service_handle, validation_service) = consensus::ServiceBuilder {
client: client.clone(),
network: polkadot_network_service.clone(),
Expand All @@ -400,7 +411,7 @@ pub fn new_full<RuntimeApi, Executor>(
select_chain: select_chain.clone(),
keystore: keystore.clone(),
max_block_data_size,
test_mode,
execution_mode,
}.build();

task_manager.spawn_essential_handle().spawn("validation-service", Box::pin(validation_service));
Expand Down
2 changes: 1 addition & 1 deletion validation/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use sp_api::ProvideRuntimeApi;
use crate::Error;
use primitives::traits::SpawnNamed;

pub use parachain::wasm_executor::ValidationPool;
pub use parachain::wasm_executor::{ValidationPool, ValidationExecutionMode};

/// Does basic checks of a collation. Provide the encoded PoV-block.
pub fn basic_checks(
Expand Down
9 changes: 4 additions & 5 deletions validation/src/validation_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use log::{warn, info, debug, trace};

use super::{Network, Collators, SharedTable, TableRouter};
use crate::Error;
use crate::pipeline::ValidationPool;
use crate::pipeline::{ValidationPool, ValidationExecutionMode};

// Remote processes may request for a validation instance to be cloned or instantiated.
// They send a oneshot channel.
Expand Down Expand Up @@ -132,9 +132,8 @@ pub struct ServiceBuilder<C, N, P, SC, SP> {
pub keystore: KeyStorePtr,
/// The maximum block-data size in bytes.
pub max_block_data_size: Option<u64>,
/// The validation worker is called using the subcommand `--nocapture validation_worker` instead
/// of `validation-worker`, suitable for test environment.
pub test_mode: bool,
/// The validation execution mode.
pub execution_mode: ValidationExecutionMode,
}

impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
Expand Down Expand Up @@ -166,7 +165,7 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
NotifyImport(sc_client_api::BlockImportNotification<Block>),
}

let validation_pool = Some(ValidationPool::new(self.test_mode));
let validation_pool = Some(ValidationPool::new(self.execution_mode));
let mut parachain_validation = ParachainValidationInstances {
client: self.client.clone(),
network: self.network,
Expand Down

0 comments on commit 8a5aa08

Please sign in to comment.