Skip to content

Commit

Permalink
Merge pull request #4459 from chenyukang/rpc-new-runtime
Browse files Browse the repository at this point in the history
Use standalone runtime for RPC service
  • Loading branch information
zhangsoledad authored Jun 28, 2024
2 parents c3279d4 + 07da76d commit c3e799f
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 15 deletions.
2 changes: 1 addition & 1 deletion ckb-bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ fn run_app_inner(
matches: &ArgMatches,
) -> Result<(), ExitCode> {
let is_silent_logging = is_silent_logging(cmd);
let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime();
let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime(None);
let setup = Setup::from_matches(bin_name, cmd, matches)?;
let _guard = SetupGuard::from_setup(&setup, &version, handle.clone(), is_silent_logging)?;

Expand Down
16 changes: 14 additions & 2 deletions ckb-bin/src/subcommand/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::thread::available_parallelism;

use crate::helper::deadlock_detection;
use ckb_app_config::{ExitCode, RunArgs};
use ckb_async_runtime::Handle;
use ckb_async_runtime::{new_global_runtime, Handle};
use ckb_build_info::Version;
use ckb_launcher::Launcher;
use ckb_logger::info;
Expand All @@ -11,8 +13,11 @@ use ckb_types::core::cell::setup_system_cell_cache;
pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> {
deadlock_detection();

let rpc_threads_num = calc_rpc_threads_num(&args);
info!("ckb version: {}", version);
let mut launcher = Launcher::new(args, version, async_handle);
info!("run rpc server with {} threads", rpc_threads_num);
let (mut rpc_handle, _rpc_stop_rx, _runtime) = new_global_runtime(Some(rpc_threads_num));
let mut launcher = Launcher::new(args, version, async_handle, rpc_handle.clone());

let block_assembler_config = launcher.sanitize_block_assembler_config()?;
let miner_enable = block_assembler_config.is_some();
Expand Down Expand Up @@ -63,7 +68,14 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(),
})
.expect("Error setting Ctrl-C handler");

rpc_handle.drop_guard();
wait_all_ckb_services_exit();

Ok(())
}

fn calc_rpc_threads_num(args: &RunArgs) -> usize {
let system_parallelism: usize = available_parallelism().unwrap().into();
let default_num = usize::max(system_parallelism, 1);
args.config.rpc.threads.unwrap_or(default_num)
}
5 changes: 3 additions & 2 deletions rpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl RpcServer {
};

let tcp_address = if let Some(addr) = config.tcp_listen_address {
let local_addr = handler.block_on(Self::start_tcp_server(rpc, addr));
let local_addr = handler.block_on(Self::start_tcp_server(rpc, addr, handler.clone()));
if let Ok(addr) = &local_addr {
info!("Listen TCP RPCServer on address: {}", addr);
};
Expand Down Expand Up @@ -137,11 +137,12 @@ impl RpcServer {
async fn start_tcp_server(
rpc: Arc<MetaIoHandler<Option<Session>>>,
tcp_listen_address: String,
handler: Handle,
) -> Result<SocketAddr, AnyError> {
// TCP server with line delimited json codec.
let listener = TcpListener::bind(tcp_listen_address).await?;
let tcp_address = listener.local_addr()?;
tokio::spawn(async move {
handler.spawn(async move {
let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
let stream_config = StreamServerConfig::default()
.with_channel_size(4)
Expand Down
2 changes: 1 addition & 1 deletion test/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Net {
)
})
.collect();
let (async_handle, _handle_recv, async_runtime) = new_global_runtime();
let (async_handle, _handle_recv, async_runtime) = new_global_runtime(None);
let controller = NetworkService::new(
Arc::clone(&network_state),
ckb_protocols,
Expand Down
3 changes: 3 additions & 0 deletions test/template/ckb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ reject_ill_transactions = true
# By default deprecated rpc methods are disabled.
enable_deprecated_rpc = true

# threads number for RPC service, default value is the number of CPU cores
# threads = 4

[tx_pool]
max_tx_pool_size = 180_000_000 # 180mb
min_fee_rate = 0 # Here fee_rate are calculated directly using size in units of shannons/KB
Expand Down
8 changes: 5 additions & 3 deletions util/launcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ pub struct Launcher {
pub version: Version,
/// ckb global runtime handle
pub async_handle: Handle,
/// rpc global runtime handle
pub rpc_handle: Handle,
}

impl Launcher {
/// Construct new Launcher from cli args
pub fn new(args: RunArgs, version: Version, async_handle: Handle) -> Self {
pub fn new(args: RunArgs, version: Version, async_handle: Handle, rpc_handle: Handle) -> Self {
Launcher {
args,
version,
async_handle,
rpc_handle,
}
}

Expand Down Expand Up @@ -428,8 +431,7 @@ impl Launcher {
builder.enable_subscription(shared.clone());
let io_handler = builder.build();

let async_handle = shared.async_handle();
let _rpc = RpcServer::new(rpc_config, io_handler, async_handle.clone());
let _rpc = RpcServer::new(rpc_config, io_handler, self.rpc_handle.clone());

network_controller
}
Expand Down
11 changes: 6 additions & 5 deletions util/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use ckb_spawn::Spawn;
use core::future::Future;
use std::sync::atomic::{AtomicU32, Ordering};

use std::thread::available_parallelism;
use tokio::runtime::Builder;
use tokio::runtime::Handle as TokioHandle;

Expand Down Expand Up @@ -88,9 +88,10 @@ impl Handle {
}

/// Create a new runtime with unique name.
fn new_runtime() -> Runtime {
fn new_runtime(worker_num: Option<usize>) -> Runtime {
Builder::new_multi_thread()
.enable_all()
.worker_threads(worker_num.unwrap_or_else(|| available_parallelism().unwrap().into()))
.thread_name_fn(|| {
static ATOMIC_ID: AtomicU32 = AtomicU32::new(0);
let id = ATOMIC_ID
Expand Down Expand Up @@ -121,8 +122,8 @@ fn new_runtime() -> Runtime {
}

/// Create new threaded_scheduler tokio Runtime, return `Runtime`
pub fn new_global_runtime() -> (Handle, Receiver<()>, Runtime) {
let runtime = new_runtime();
pub fn new_global_runtime(worker_num: Option<usize>) -> (Handle, Receiver<()>, Runtime) {
let runtime = new_runtime(worker_num);
let handle = runtime.handle().clone();
let (guard, handle_stop_rx): (Sender<()>, Receiver<()>) = tokio::sync::mpsc::channel::<()>(1);

Expand All @@ -132,7 +133,7 @@ pub fn new_global_runtime() -> (Handle, Receiver<()>, Runtime) {
/// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle,
/// NOTICE: This is only used in testing
pub fn new_background_runtime() -> Handle {
let runtime = new_runtime();
let runtime = new_runtime(None);
let handle = runtime.handle().clone();

let (guard, mut handle_stop_rx): (Sender<()>, Receiver<()>) =
Expand Down
2 changes: 1 addition & 1 deletion util/stop-handler/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl TestStopMemo {

#[test]
fn basic() {
let (mut handle, mut stop_recv, _runtime) = new_global_runtime();
let (mut handle, mut stop_recv, _runtime) = new_global_runtime(None);

ctrlc::set_handler(move || {
broadcast_exit_signals();
Expand Down

0 comments on commit c3e799f

Please sign in to comment.