diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 180fedaccb98..5ca3932e7bda 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -25,6 +25,7 @@ use reth_rpc::{ JwtError, JwtSecret, }; +use crate::cli::components::RethNodeComponents; use reth_rpc_builder::{ auth::{AuthServerConfig, AuthServerHandle}, constants, @@ -175,32 +176,15 @@ impl RpcServerArgs { /// for the auth server that handles the `engine_` API that's accessed by the consensus /// layer. #[allow(clippy::too_many_arguments)] - pub async fn start_servers( + pub async fn start_servers( &self, - provider: Provider, - pool: Pool, - network: Network, - executor: Tasks, - events: Events, + components: &Reth, engine_api: Engine, jwt_secret: JwtSecret, conf: &mut Conf, ) -> eyre::Result<(RpcServerHandle, AuthServerHandle)> where - Provider: BlockReaderIdExt - + AccountReader - + HeaderProvider - + StateProviderFactory - + EvmEnvProvider - + ChainSpecProvider - + ChangeSetReader - + Clone - + Unpin - + 'static, - Pool: TransactionPool + Clone + 'static, - Network: NetworkInfo + Peers + Clone + 'static, - Tasks: TaskSpawner + Clone + 'static, - Events: CanonStateSubscriptions + Clone + 'static, + Reth: RethNodeComponents, Engine: EngineApiServer, Conf: RethNodeCommandConfig, { @@ -210,15 +194,15 @@ impl RpcServerArgs { debug!(target: "reth::cli", http=?module_config.http(), ws=?module_config.ws(), "Using RPC module config"); let (mut rpc_modules, auth_module, mut registry) = RpcModuleBuilder::default() - .with_provider(provider) - .with_pool(pool) - .with_network(network) - .with_events(events) - .with_executor(executor) + .with_provider(components.provider()) + .with_pool(components.pool()) + .with_network(components.network()) + .with_events(components.events()) + .with_executor(components.task_executor()) .build_with_auth_server(module_config, engine_api); // apply configured customization - conf.extend_rpc_modules(self, &mut registry, &mut rpc_modules)?; + conf.extend_rpc_modules(self, components, &mut registry, &mut rpc_modules)?; let server_config = self.rpc_server_config(); let launch_rpc = rpc_modules.start_server(server_config).map_ok(|handle| { diff --git a/bin/reth/src/cli/components.rs b/bin/reth/src/cli/components.rs new file mode 100644 index 000000000000..0db43553eca4 --- /dev/null +++ b/bin/reth/src/cli/components.rs @@ -0,0 +1,119 @@ +//! Components that are used by the node command. + +use reth_network_api::{NetworkInfo, Peers}; +use reth_primitives::ChainSpec; +use reth_provider::{ + AccountReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, + EvmEnvProvider, StateProviderFactory, +}; +use reth_tasks::TaskSpawner; +use reth_transaction_pool::TransactionPool; +use std::sync::Arc; + +/// Helper trait to unify all provider traits for simplicity. +pub trait FullProvider: + BlockReaderIdExt + + AccountReader + + StateProviderFactory + + EvmEnvProvider + + ChainSpecProvider + + ChangeSetReader + + Clone + + Unpin + + 'static +{ +} + +impl FullProvider for T where + T: BlockReaderIdExt + + AccountReader + + StateProviderFactory + + EvmEnvProvider + + ChainSpecProvider + + ChangeSetReader + + Clone + + Unpin + + 'static +{ +} + +/// The trait that is implemented for the Node command. +pub trait RethNodeComponents { + /// The Provider type that is provided by the not itself + type Provider: FullProvider; + /// The transaction pool type + type Pool: TransactionPool + Clone + Unpin + 'static; + /// The network type used to communicate with p2p. + type Network: NetworkInfo + Peers + Clone + 'static; + /// The events type used to create subscriptions. + type Events: CanonStateSubscriptions + Clone + 'static; + /// The type that is used to spawn tasks. + type Tasks: TaskSpawner + Clone + Unpin + 'static; + + /// Returns the instance of the provider + fn provider(&self) -> Self::Provider; + + /// Returns the instance of the task executor. + fn task_executor(&self) -> Self::Tasks; + + /// Returns the instance of the transaction pool. + fn pool(&self) -> Self::Pool; + + /// Returns the instance of the network API. + fn network(&self) -> Self::Network; + + /// Returns the instance of the events subscription handler. + fn events(&self) -> Self::Events; + + /// Helper function to return the chain spec. + fn chain_spec(&self) -> Arc { + self.provider().chain_spec() + } +} + +/// A Generic implementation of the RethNodeComponents trait. +#[derive(Clone, Debug)] +#[allow(missing_docs)] +pub struct RethNodeComponentsImpl { + pub provider: Provider, + pub pool: Pool, + pub network: Network, + pub task_executor: Tasks, + pub events: Events, +} + +impl RethNodeComponents + for RethNodeComponentsImpl +where + Provider: FullProvider + Clone + 'static, + Tasks: TaskSpawner + Clone + Unpin + 'static, + Pool: TransactionPool + Clone + Unpin + 'static, + Network: NetworkInfo + Peers + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, +{ + type Provider = Provider; + type Pool = Pool; + type Network = Network; + type Events = Events; + type Tasks = Tasks; + + fn provider(&self) -> Self::Provider { + self.provider.clone() + } + + fn task_executor(&self) -> Self::Tasks { + self.task_executor.clone() + } + + fn pool(&self) -> Self::Pool { + self.pool.clone() + } + + fn network(&self) -> Self::Network { + self.network.clone() + } + + fn events(&self) -> Self::Events { + self.events.clone() + } +} diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index b636c8bc7c66..ef2d52b6544e 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -1,19 +1,15 @@ //! Support for integrating customizations into the CLI. -use crate::cli::config::{PayloadBuilderConfig, RethRpcConfig}; +use crate::cli::{ + components::RethNodeComponents, + config::{PayloadBuilderConfig, RethRpcConfig}, +}; use clap::Args; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; -use reth_network_api::{NetworkInfo, Peers}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; -use reth_primitives::ChainSpec; -use reth_provider::{ - AccountReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, - EvmEnvProvider, StateProviderFactory, -}; use reth_rpc_builder::{RethModuleRegistry, TransportRpcModules}; use reth_tasks::TaskSpawner; -use reth_transaction_pool::TransactionPool; -use std::{fmt, sync::Arc}; +use std::fmt; /// A trait that allows for extending parts of the CLI with additional functionality. /// @@ -36,32 +32,47 @@ impl RethCliExt for () { /// A trait that allows for extending and customizing parts of the node command /// [NodeCommand](crate::node::NodeCommand). pub trait RethNodeCommandConfig: fmt::Debug { + /// Event hook called once all components have been initialized. + fn on_components_initialized( + &mut self, + components: &Reth, + ) -> eyre::Result<()> { + let _ = components; + Ok(()) + } + + /// Event hook called once the node has been launched. + fn on_node_started(&mut self, components: &Reth) -> eyre::Result<()> { + let _ = components; + Ok(()) + } + /// Allows for registering additional RPC modules for the transports. /// /// This is expected to call the merge functions of [TransportRpcModules], for example /// [TransportRpcModules::merge_configured] - fn extend_rpc_modules( + #[allow(clippy::type_complexity)] + fn extend_rpc_modules( &mut self, - _config: &Conf, - _registry: &mut RethModuleRegistry, - _modules: &mut TransportRpcModules, + config: &Conf, + components: &Reth, + registry: &mut RethModuleRegistry< + Reth::Provider, + Reth::Pool, + Reth::Network, + Reth::Tasks, + Reth::Events, + >, + modules: &mut TransportRpcModules, ) -> eyre::Result<()> where Conf: RethRpcConfig, - Provider: BlockReaderIdExt - + AccountReader - + StateProviderFactory - + EvmEnvProvider - + ChainSpecProvider - + ChangeSetReader - + Clone - + Unpin - + 'static, - Pool: TransactionPool + Clone + 'static, - Network: NetworkInfo + Peers + Clone + 'static, - Tasks: TaskSpawner + Clone + 'static, - Events: CanonStateSubscriptions + Clone + 'static, + Reth: RethNodeComponents, { + let _ = config; + let _ = components; + let _ = registry; + let _ = modules; Ok(()) } @@ -70,35 +81,32 @@ pub trait RethNodeCommandConfig: fmt::Debug { /// /// By default this spawns a [BasicPayloadJobGenerator] with the default configuration /// [BasicPayloadJobGeneratorConfig]. - fn spawn_payload_builder_service( + fn spawn_payload_builder_service( &mut self, conf: &Conf, - provider: Provider, - pool: Pool, - executor: Tasks, - chain_spec: Arc, + components: &Reth, ) -> eyre::Result where Conf: PayloadBuilderConfig, - Provider: StateProviderFactory + BlockReaderIdExt + Clone + Unpin + 'static, - Pool: TransactionPool + Unpin + 'static, - Tasks: TaskSpawner + Clone + Unpin + 'static, + Reth: RethNodeComponents, { let payload_generator = BasicPayloadJobGenerator::new( - provider, - pool, - executor.clone(), + components.provider(), + components.pool(), + components.task_executor(), BasicPayloadJobGeneratorConfig::default() .interval(conf.interval()) .deadline(conf.deadline()) .max_payload_tasks(conf.max_payload_tasks()) .extradata(conf.extradata_rlp_bytes()) .max_gas_limit(conf.max_gas_limit()), - chain_spec, + components.chain_spec(), ); let (payload_service, payload_builder) = PayloadBuilderService::new(payload_generator); - executor.spawn_critical("payload builder service", Box::pin(payload_service)); + components + .task_executor() + .spawn_critical("payload builder service", Box::pin(payload_service)); Ok(payload_builder) } @@ -165,52 +173,42 @@ impl NoArgs { } impl RethNodeCommandConfig for NoArgs { - fn extend_rpc_modules( + fn extend_rpc_modules( &mut self, config: &Conf, - registry: &mut RethModuleRegistry, - modules: &mut TransportRpcModules<()>, + components: &Reth, + registry: &mut RethModuleRegistry< + Reth::Provider, + Reth::Pool, + Reth::Network, + Reth::Tasks, + Reth::Events, + >, + modules: &mut TransportRpcModules, ) -> eyre::Result<()> where Conf: RethRpcConfig, - Provider: BlockReaderIdExt - + AccountReader - + StateProviderFactory - + EvmEnvProvider - + ChainSpecProvider - + ChangeSetReader - + Clone - + Unpin - + 'static, - Pool: TransactionPool + Clone + 'static, - Network: NetworkInfo + Peers + Clone + 'static, - Tasks: TaskSpawner + Clone + 'static, - Events: CanonStateSubscriptions + Clone + 'static, + Reth: RethNodeComponents, { if let Some(conf) = self.inner_mut() { - conf.extend_rpc_modules(config, registry, modules) + conf.extend_rpc_modules(config, components, registry, modules) } else { Ok(()) } } - fn spawn_payload_builder_service( + fn spawn_payload_builder_service( &mut self, conf: &Conf, - provider: Provider, - pool: Pool, - executor: Tasks, - chain_spec: Arc, + components: &Reth, ) -> eyre::Result where Conf: PayloadBuilderConfig, - Provider: StateProviderFactory + BlockReaderIdExt + Clone + Unpin + 'static, - Pool: TransactionPool + Unpin + 'static, - Tasks: TaskSpawner + Clone + Unpin + 'static, + Reth: RethNodeComponents, { self.inner_mut() .ok_or_else(|| eyre::eyre!("config value must be set"))? - .spawn_payload_builder_service(conf, provider, pool, executor, chain_spec) + .spawn_payload_builder_service(conf, components) } } diff --git a/bin/reth/src/cli/mod.rs b/bin/reth/src/cli/mod.rs index bc2b9adb1feb..1938e1ef2fd0 100644 --- a/bin/reth/src/cli/mod.rs +++ b/bin/reth/src/cli/mod.rs @@ -19,6 +19,7 @@ use reth_tracing::{ }; use std::{fmt, fmt::Display, sync::Arc}; +pub mod components; pub mod config; pub mod ext; diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 868e55e2042d..60c2ef62bac8 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -9,6 +9,7 @@ use crate::{ RpcServerArgs, TxPoolArgs, }, cli::{ + components::RethNodeComponentsImpl, config::RethRpcConfig, ext::{RethCliExt, RethNodeCommandConfig}, }, @@ -352,17 +353,19 @@ impl NodeCommand { debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID"); let network_client = network.fetch_client().await?; - let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); + let components = RethNodeComponentsImpl { + provider: blockchain_db.clone(), + pool: transaction_pool.clone(), + network: network.clone(), + task_executor: ctx.task_executor.clone(), + events: blockchain_db.clone(), + }; + self.ext.on_components_initialized(&components)?; debug!(target: "reth::cli", "Spawning payload builder service"); - let payload_builder = self.ext.spawn_payload_builder_service( - &self.builder, - blockchain_db.clone(), - transaction_pool.clone(), - ctx.task_executor.clone(), - Arc::clone(&self.chain), - )?; + let payload_builder = self.ext.spawn_payload_builder_service(&self.builder, &components)?; + let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); let max_block = if let Some(block) = self.debug.max_block { Some(block) } else if let Some(tip) = self.debug.tip { @@ -530,19 +533,8 @@ impl NodeCommand { self.adjust_instance_ports(); // Start RPC servers - let (_rpc_server, _auth_server) = self - .rpc - .start_servers( - blockchain_db.clone(), - transaction_pool.clone(), - network.clone(), - ctx.task_executor.clone(), - blockchain_tree, - engine_api, - jwt_secret, - &mut self.ext, - ) - .await?; + let (_rpc_server, _auth_server) = + self.rpc.start_servers(&components, engine_api, jwt_secret, &mut self.ext).await?; // Run consensus engine to completion let (tx, rx) = oneshot::channel(); @@ -552,6 +544,8 @@ impl NodeCommand { let _ = tx.send(res); }); + self.ext.on_node_started(&components)?; + rx.await??; info!(target: "reth::cli", "Consensus engine has exited."); diff --git a/examples/additional-rpc-namespace-in-cli/src/main.rs b/examples/additional-rpc-namespace-in-cli/src/main.rs index 58e867e84bdc..9823fef03f1a 100644 --- a/examples/additional-rpc-namespace-in-cli/src/main.rs +++ b/examples/additional-rpc-namespace-in-cli/src/main.rs @@ -15,17 +15,12 @@ use clap::Parser; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use reth::{ cli::{ + components::RethNodeComponents, config::RethRpcConfig, ext::{RethCliExt, RethNodeCommandConfig}, Cli, }, - network::{NetworkInfo, Peers}, - providers::{ - BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, - EvmEnvProvider, StateProviderFactory, - }, rpc::builder::{RethModuleRegistry, TransportRpcModules}, - tasks::TaskSpawner, }; use reth_transaction_pool::TransactionPool; @@ -51,26 +46,22 @@ struct RethCliTxpoolExt { impl RethNodeCommandConfig for RethCliTxpoolExt { // This is the entrypoint for the CLI to extend the RPC server with custom rpc namespaces. - fn extend_rpc_modules( + fn extend_rpc_modules( &mut self, _config: &Conf, - registry: &mut RethModuleRegistry, + _components: &Reth, + registry: &mut RethModuleRegistry< + Reth::Provider, + Reth::Pool, + Reth::Network, + Reth::Tasks, + Reth::Events, + >, modules: &mut TransportRpcModules, ) -> eyre::Result<()> where Conf: RethRpcConfig, - Provider: BlockReaderIdExt - + StateProviderFactory - + EvmEnvProvider - + ChainSpecProvider - + ChangeSetReader - + Clone - + Unpin - + 'static, - Pool: TransactionPool + Clone + 'static, - Network: NetworkInfo + Peers + Clone + 'static, - Tasks: TaskSpawner + Clone + 'static, - Events: CanonStateSubscriptions + Clone + 'static, + Reth: RethNodeComponents, { if !self.enable_ext { return Ok(())