Skip to content

Commit

Permalink
Shutdown server runtime before program exit (#8313)
Browse files Browse the repository at this point in the history
We don't ever terminate the server runtime in benchmark when we exit the
program. This causes RocksDB running in the server runtime to try to
access global state which is cleaned up during exit. This PR terminates
the server runtime with a oneshot channel and then exit.
Before this fix, I can reproduce the error in almost every run on my
linux computer. After this PR, I did not run into any errors even after
10+ runs.
  • Loading branch information
sadhansood authored Feb 15, 2023
1 parent 629804d commit 7a661d1
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jobs:
cargo nextest run --profile ci
- name: benchmark (smoke)
run: |
# cargo run --package sui-benchmark --bin stress -- --log-path /tmp/stress.log --num-client-threads 10 --num-server-threads 24 --num-transfer-accounts 2 bench --target-qps 100 --num-workers 10 --transfer-object 50 --shared-counter 50 --run-duration 10s --stress-stat-collection
cargo run --package sui-benchmark --bin stress -- --log-path /tmp/stress.log --num-client-threads 10 --num-server-threads 24 --num-transfer-accounts 2 bench --target-qps 100 --num-workers 10 --transfer-object 50 --shared-counter 50 --run-duration 10s --stress-stat-collection
pushd narwhal/benchmark && fab smoke && popd
- name: doctests
run: |
Expand Down
66 changes: 40 additions & 26 deletions crates/sui-benchmark/src/benchmark_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use prometheus::Registry;
use rand::seq::SliceRandom;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;
use sui_config::utils;

Expand All @@ -22,7 +23,7 @@ use sui_types::object::{generate_test_gas_objects_with_owner, Owner};
use test_utils::authority::test_and_configure_authority_configs;
use test_utils::authority::{spawn_fullnode, spawn_test_authorities};
use tokio::runtime::Builder;
use tokio::sync::Barrier;
use tokio::sync::{oneshot, Barrier};
use tracing::info;

pub enum Env {
Expand All @@ -41,13 +42,19 @@ pub struct ProxyGasAndCoin {
pub pay_coin_type_tag: TypeTag,
}

pub struct BenchmarkSetup {
pub server_handle: JoinHandle<()>,
pub shutdown_notifier: oneshot::Sender<()>,
pub proxy_and_coins: Vec<ProxyGasAndCoin>,
}

impl Env {
pub async fn setup(
&self,
barrier: Arc<Barrier>,
registry: &Registry,
opts: &Opts,
) -> Result<Vec<ProxyGasAndCoin>> {
) -> Result<BenchmarkSetup> {
match self {
Env::Local => {
self.setup_local_env(
Expand Down Expand Up @@ -83,7 +90,7 @@ impl Env {
committee_size: usize,
server_metric_port: u16,
num_server_threads: u64,
) -> Result<Vec<ProxyGasAndCoin>> {
) -> Result<BenchmarkSetup> {
info!("Running benchmark setup in local mode..");
let mut network_config = test_and_configure_authority_configs(committee_size);
let mut metric_port = server_metric_port;
Expand Down Expand Up @@ -119,7 +126,8 @@ impl Env {
let fullnode_barrier_clone = fullnode_barrier.clone();
// spawn a thread to spin up sui nodes on the multi-threaded server runtime.
// running forever
let _validators = std::thread::spawn(move || {
let (sender, recv) = tokio::sync::oneshot::channel::<()>();
let join_handle = std::thread::spawn(move || {
// create server runtime
let server_runtime = Builder::new_multi_thread()
.thread_stack_size(32 * 1024 * 1024)
Expand All @@ -134,10 +142,7 @@ impl Env {
let _fullnode = spawn_fullnode(&cloned_config, Some(fullnode_rpc_port)).await;
fullnode_barrier_clone.wait().await;
barrier.wait().await;
// This thread cannot exit, otherwise validators will shutdown.
loop {
sleep(Duration::from_secs(300)).await;
}
recv.await.expect("Unable to wait for terminate signal");
});
});
// Let fullnode be created.
Expand All @@ -155,20 +160,24 @@ impl Env {
);
let keypair = Arc::new(keypair);
let ttag = pay_coin.get_move_template_type()?;
Ok(vec![ProxyGasAndCoin {
primary_gas: (
primary_gas.compute_object_reference(),
Owner::AddressOwner(owner),
keypair.clone(),
),
pay_coin: (
pay_coin.compute_object_reference(),
Owner::AddressOwner(owner),
keypair,
),
pay_coin_type_tag: ttag,
proxy,
}])
Ok(BenchmarkSetup {
server_handle: join_handle,
shutdown_notifier: sender,
proxy_and_coins: vec![ProxyGasAndCoin {
primary_gas: (
primary_gas.compute_object_reference(),
Owner::AddressOwner(owner),
keypair.clone(),
),
pay_coin: (
pay_coin.compute_object_reference(),
Owner::AddressOwner(owner),
keypair,
),
pay_coin_type_tag: ttag,
proxy,
}],
})
}

async fn setup_remote_env(
Expand All @@ -182,17 +191,18 @@ impl Env {
use_fullnode_for_reconfig: bool,
use_fullnode_for_execution: bool,
fullnode_rpc_address: Vec<String>,
) -> Result<Vec<ProxyGasAndCoin>> {
) -> Result<BenchmarkSetup> {
info!("Running benchmark setup in remote mode ..");
std::thread::spawn(move || {
let (sender, recv) = tokio::sync::oneshot::channel::<()>();
let join_handle = std::thread::spawn(move || {
Builder::new_multi_thread()
.build()
.unwrap()
.block_on(async move {
barrier.wait().await;
recv.await.expect("Unable to wait for terminate signal");
});
});

let fullnode_rpc_urls = fullnode_rpc_address.clone();
info!("List of fullnode rpc urls: {:?}", fullnode_rpc_urls);
let proxies: Vec<Arc<dyn ValidatorProxy + Send + Sync>> = if use_fullnode_for_execution {
Expand Down Expand Up @@ -286,6 +296,10 @@ impl Env {
proxy: proxy.clone(),
})
}
Ok(proxy_gas_and_coins)
Ok(BenchmarkSetup {
server_handle: join_handle,
shutdown_notifier: sender,
proxy_and_coins: proxy_gas_and_coins,
})
}
}
20 changes: 17 additions & 3 deletions crates/sui-benchmark/src/bin/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ async fn main() -> Result<()> {
let barrier = Arc::new(Barrier::new(2));
let cloned_barrier = barrier.clone();
let env = if opts.local { Env::Local } else { Env::Remote };
let proxy_gas_and_coins = env.setup(cloned_barrier, &registry, &opts).await?;
let bench_setup = env.setup(cloned_barrier, &registry, &opts).await?;
let system_state_observer = {
// Only need to get system state from one proxy as it is shared for the
// whole network.
let mut system_state_observer = SystemStateObserver::new(
proxy_gas_and_coins
bench_setup
.proxy_and_coins
.choose(&mut rand::thread_rng())
.context("Failed to get proxy for system state observer")?
.proxy
Expand Down Expand Up @@ -116,7 +117,11 @@ async fn main() -> Result<()> {
};

let proxy_workloads = workload_configuration
.configure(proxy_gas_and_coins, &opts, system_state_observer.clone())
.configure(
bench_setup.proxy_and_coins,
&opts,
system_state_observer.clone(),
)
.await?;
let interval = opts.run_duration;
// We only show continuous progress in stderr
Expand All @@ -140,6 +145,15 @@ async fn main() -> Result<()> {
if let Err(err) = joined {
Err(anyhow!("Failed to join client runtime: {:?}", err))
} else {
// send signal to stop the server runtime
bench_setup
.shutdown_notifier
.send(())
.expect("Failed to stop server runtime");
bench_setup
.server_handle
.join()
.expect("Failed to join the server handle");
let (benchmark_stats, stress_stats) = joined.unwrap().unwrap();
let benchmark_table = benchmark_stats.to_table();
eprintln!("Benchmark Report:");
Expand Down

0 comments on commit 7a661d1

Please sign in to comment.