Skip to content

Commit

Permalink
impl Debug for sc_service::Configuration (paritytech#6400)
Browse files Browse the repository at this point in the history
* Initial commit

Forked at: d735e4d
No parent branch.

* Make sc_service::Configuration derive Debug

* Replace task_executor fn's input by proper TaskExecutor type (cleaner)

* impl From<Fn> for TaskExecutor

* Update client/cli/src/runner.rs

* Add some doc, examples and tests

* Replace Deref by fn spawn as suggested

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
  • Loading branch information
cecton and bkchr authored Jun 23, 2020
1 parent cf7432a commit 6221146
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 59 deletions.
6 changes: 6 additions & 0 deletions client/chain-spec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,9 @@ pub trait ChainSpec: BuildStorage + Send {
/// This will be used as storage at genesis.
fn set_storage(&mut self, storage: Storage);
}

impl std::fmt::Debug for dyn ChainSpec {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "ChainSpec(name = {:?}, id = {:?})", self.name(), self.id())
}
}
9 changes: 3 additions & 6 deletions client/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@ use names::{Generator, Name};
use sc_client_api::execution_extensions::ExecutionStrategies;
use sc_service::config::{
BasePath, Configuration, DatabaseConfig, ExtTransport, KeystoreConfig, NetworkConfiguration,
NodeKeyConfig, OffchainWorkerConfig, PrometheusConfig, PruningMode, Role, RpcMethods, TaskType,
TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod,
NodeKeyConfig, OffchainWorkerConfig, PrometheusConfig, PruningMode, Role, RpcMethods,
TaskExecutor, TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod,
};
use sc_service::{ChainSpec, TracingReceiver};
use std::future::Future;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;

/// The maximum number of characters for a node name.
pub(crate) const NODE_NAME_MAX_LENGTH: usize = 32;
Expand Down Expand Up @@ -409,7 +406,7 @@ pub trait CliConfiguration: Sized {
fn create_configuration<C: SubstrateCli>(
&self,
cli: &C,
task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>,
task_executor: TaskExecutor,
) -> Result<Configuration> {
let is_dev = self.is_dev()?;
let chain_id = self.chain_id(is_dev)?;
Expand Down
7 changes: 2 additions & 5 deletions client/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@ use log::info;
pub use params::*;
use regex::Regex;
pub use runner::*;
use sc_service::{ChainSpec, Configuration, TaskType};
use std::future::Future;
use sc_service::{ChainSpec, Configuration, TaskExecutor};
use std::io::Write;
use std::pin::Pin;
use std::sync::Arc;
pub use structopt;
use structopt::{
clap::{self, AppSettings},
Expand Down Expand Up @@ -199,7 +196,7 @@ pub trait SubstrateCli: Sized {
fn create_configuration<T: CliConfiguration>(
&self,
command: &T,
task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>,
task_executor: TaskExecutor,
) -> error::Result<Configuration> {
command.create_configuration(self, task_executor)
}
Expand Down
26 changes: 12 additions & 14 deletions client/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand, Ta
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
use sp_version::RuntimeVersion;
use std::{fmt::Debug, marker::PhantomData, str::FromStr, sync::Arc};
use std::{fmt::Debug, marker::PhantomData, str::FromStr};

#[cfg(target_family = "unix")]
async fn main<F, E>(func: F) -> std::result::Result<(), Box<dyn std::error::Error>>
Expand Down Expand Up @@ -119,23 +119,21 @@ impl<C: SubstrateCli> Runner<C> {
let tokio_runtime = build_runtime()?;
let runtime_handle = tokio_runtime.handle().clone();

let task_executor = Arc::new(
move |fut, task_type| {
match task_type {
TaskType::Async => { runtime_handle.spawn(fut); }
TaskType::Blocking => {
runtime_handle.spawn( async move {
// `spawn_blocking` is looking for the current runtime, and as such has to be called
// from within `spawn`.
tokio::task::spawn_blocking(move || futures::executor::block_on(fut))
});
}
let task_executor = move |fut, task_type| {
match task_type {
TaskType::Async => { runtime_handle.spawn(fut); }
TaskType::Blocking => {
runtime_handle.spawn(async move {
// `spawn_blocking` is looking for the current runtime, and as such has to
// be called from within `spawn`.
tokio::task::spawn_blocking(move || futures::executor::block_on(fut))
});
}
}
);
};

Ok(Runner {
config: command.create_configuration(cli, task_executor)?,
config: command.create_configuration(cli, task_executor.into())?,
tokio_runtime,
phantom: PhantomData,
})
Expand Down
2 changes: 1 addition & 1 deletion client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ pub struct DatabaseSettings {
}

/// Where to find the database..
#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum DatabaseSettingsSrc {
/// Load a RocksDB database from a given path. Recommended for most uses.
RocksDb {
Expand Down
76 changes: 72 additions & 4 deletions client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use prometheus_endpoint::Registry;
use tempfile::TempDir;

/// Service configuration.
#[derive(Debug)]
pub struct Configuration {
/// Implementation name
pub impl_name: &'static str,
Expand All @@ -42,7 +43,7 @@ pub struct Configuration {
/// Node role.
pub role: Role,
/// How to spawn background tasks. Mandatory, otherwise creating a `Service` will error.
pub task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>,
pub task_executor: TaskExecutor,
/// Extrinsic pool configuration.
pub transaction_pool: TransactionPoolOptions,
/// Network configuration.
Expand Down Expand Up @@ -120,7 +121,7 @@ pub enum TaskType {
}

/// Configuration of the client keystore.
#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum KeystoreConfig {
/// Keystore at a path on-disk. Recommended for native nodes.
Path {
Expand All @@ -143,7 +144,7 @@ impl KeystoreConfig {
}
}
/// Configuration of the database of the client.
#[derive(Clone, Default)]
#[derive(Debug, Clone, Default)]
pub struct OffchainWorkerConfig {
/// If this is allowed.
pub enabled: bool,
Expand All @@ -152,7 +153,7 @@ pub struct OffchainWorkerConfig {
}

/// Configuration of the Prometheus endpoint.
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct PrometheusConfig {
/// Port to use.
pub port: SocketAddr,
Expand Down Expand Up @@ -199,6 +200,7 @@ impl Default for RpcMethods {
}

/// The base path that is used for everything that needs to be write on disk to run a node.
#[derive(Debug)]
pub enum BasePath {
/// A temporary directory is used as base path and will be deleted when dropped.
#[cfg(not(target_os = "unknown"))]
Expand Down Expand Up @@ -253,3 +255,69 @@ impl std::convert::From<PathBuf> for BasePath {
BasePath::new(path)
}
}

type TaskExecutorInner = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>;

/// Callable object that execute tasks.
///
/// This struct can be created easily using `Into`.
///
/// # Examples
///
/// ## Using tokio
///
/// ```
/// # use sc_service::TaskExecutor;
/// # mod tokio { pub mod runtime {
/// # #[derive(Clone)]
/// # pub struct Runtime;
/// # impl Runtime {
/// # pub fn new() -> Result<Self, ()> { Ok(Runtime) }
/// # pub fn handle(&self) -> &Self { &self }
/// # pub fn spawn(&self, _: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>) {}
/// # }
/// # } }
/// use tokio::runtime::Runtime;
///
/// let runtime = Runtime::new().unwrap();
/// let handle = runtime.handle().clone();
/// let task_executor: TaskExecutor = (move |future, _task_type| {
/// handle.spawn(future);
/// }).into();
/// ```
///
/// ## Using async-std
///
/// ```
/// # use sc_service::TaskExecutor;
/// # mod async_std { pub mod task {
/// # pub fn spawn(_: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>) {}
/// # } }
/// let task_executor: TaskExecutor = (|future, _task_type| {
/// async_std::task::spawn(future);
/// }).into();
/// ```
#[derive(Clone)]
pub struct TaskExecutor(TaskExecutorInner);

impl std::fmt::Debug for TaskExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "TaskExecutor")
}
}

impl<F> std::convert::From<F> for TaskExecutor
where
F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync + 'static,
{
fn from(x: F) -> Self {
Self(Arc::new(x))
}
}

impl TaskExecutor {
/// Spawns a new asynchronous task.
pub fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>, task_type: TaskType) {
self.0(future, task_type)
}
}
4 changes: 3 additions & 1 deletion client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ pub use self::builder::{
ServiceBuilder, ServiceBuilderCommand, TFullClient, TLightClient, TFullBackend, TLightBackend,
TFullCallExecutor, TLightCallExecutor, RpcExtensionBuilder,
};
pub use config::{BasePath, Configuration, DatabaseConfig, PruningMode, Role, RpcMethods, TaskType};
pub use config::{
BasePath, Configuration, DatabaseConfig, PruningMode, Role, RpcMethods, TaskExecutor, TaskType,
};
pub use sc_chain_spec::{
ChainSpec, GenericChainSpec, Properties, RuntimeGenesis, Extension as ChainSpecExtension,
NoExtension, ChainType,
Expand Down
15 changes: 6 additions & 9 deletions client/service/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

//! Substrate service tasks management module.
use std::{panic, pin::Pin, result::Result, sync::Arc};
use std::{panic, result::Result};
use exit_future::Signal;
use log::debug;
use futures::{
Expand All @@ -29,18 +29,15 @@ use prometheus_endpoint::{
};
use sc_client_api::CloneableSpawn;
use sp_utils::mpsc::TracingUnboundedSender;
use crate::config::TaskType;
use crate::config::{TaskExecutor, TaskType};

mod prometheus_future;

/// Type alias for service task executor (usually runtime).
pub type ServiceTaskExecutor = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>;

/// An handle for spawning tasks in the service.
#[derive(Clone)]
pub struct SpawnTaskHandle {
on_exit: exit_future::Exit,
executor: ServiceTaskExecutor,
executor: TaskExecutor,
metrics: Option<Metrics>,
}

Expand Down Expand Up @@ -113,7 +110,7 @@ impl SpawnTaskHandle {
}
};

(self.executor)(Box::pin(future), task_type);
self.executor.spawn(Box::pin(future), task_type);
}
}

Expand Down Expand Up @@ -216,7 +213,7 @@ pub struct TaskManager {
/// A signal that makes the exit future above resolve, fired on service drop.
signal: Option<Signal>,
/// How to spawn background tasks.
executor: ServiceTaskExecutor,
executor: TaskExecutor,
/// Prometheus metric where to report the polling times.
metrics: Option<Metrics>,
}
Expand All @@ -225,7 +222,7 @@ impl TaskManager {
/// If a Prometheus registry is passed, it will be used to report statistics about the
/// service tasks.
pub(super) fn new(
executor: ServiceTaskExecutor,
executor: TaskExecutor,
prometheus_registry: Option<&Registry>
) -> Result<Self, PrometheusError> {
let (signal, on_exit) = exit_future::signal();
Expand Down
44 changes: 27 additions & 17 deletions client/service/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use sc_service::{
RuntimeGenesis,
Role,
Error,
TaskType,
TaskExecutor,
};
use sp_blockchain::HeaderBackend;
use sc_network::{multiaddr, Multiaddr};
Expand Down Expand Up @@ -142,7 +142,7 @@ fn node_config<G: RuntimeGenesis + 'static, E: ChainSpecExtension + Clone + 'sta
index: usize,
spec: &GenericChainSpec<G, E>,
role: Role,
task_executor: Arc<dyn Fn(Pin<Box<dyn futures::Future<Output = ()> + Send>>, TaskType) + Send + Sync>,
task_executor: TaskExecutor,
key_seed: Option<String>,
base_port: u16,
root: &TempDir,
Expand Down Expand Up @@ -256,17 +256,19 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
authorities: impl Iterator<Item = (String, impl FnOnce(Configuration) -> Result<(F, U), Error>)>
) {
let executor = self.runtime.executor();
let task_executor: TaskExecutor = {
let executor = executor.clone();
(move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>, _| {
executor.spawn(fut.unit_error().compat());
}).into()
};

for (key, authority) in authorities {
let task_executor = {
let executor = executor.clone();
Arc::new(move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>, _| executor.spawn(fut.unit_error().compat()))
};
let node_config = node_config(
self.nodes,
&self.chain_spec,
Role::Authority { sentry_nodes: Vec::new() },
task_executor,
task_executor.clone(),
Some(key),
self.base_port,
&temp,
Expand All @@ -282,11 +284,15 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
}

for full in full {
let task_executor = {
let executor = executor.clone();
Arc::new(move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>, _| executor.spawn(fut.unit_error().compat()))
};
let node_config = node_config(self.nodes, &self.chain_spec, Role::Full, task_executor, None, self.base_port, &temp);
let node_config = node_config(
self.nodes,
&self.chain_spec,
Role::Full,
task_executor.clone(),
None,
self.base_port,
&temp,
);
let addr = node_config.network.listen_addresses.iter().next().unwrap().clone();
let (service, user_data) = full(node_config).expect("Error creating test node service");
let service = SyncService::from(service);
Expand All @@ -298,11 +304,15 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
}

for light in light {
let task_executor = {
let executor = executor.clone();
Arc::new(move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>, _| executor.spawn(fut.unit_error().compat()))
};
let node_config = node_config(self.nodes, &self.chain_spec, Role::Light, task_executor, None, self.base_port, &temp);
let node_config = node_config(
self.nodes,
&self.chain_spec,
Role::Light,
task_executor.clone(),
None,
self.base_port,
&temp,
);
let addr = node_config.network.listen_addresses.iter().next().unwrap().clone();
let service = SyncService::from(light(node_config).expect("Error creating test node service"));

Expand Down
6 changes: 6 additions & 0 deletions primitives/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ pub trait Database<H: Clone>: Send + Sync {
}
}

impl<H> std::fmt::Debug for dyn Database<H> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Database")
}
}

/// Call `f` with the value previously stored against `key` and return the result, or `None` if
/// `key` is not currently in the database.
///
Expand Down
Loading

0 comments on commit 6221146

Please sign in to comment.