Skip to content

Commit

Permalink
Add stress tool cpu usage to output (MystenLabs#7443)
Browse files Browse the repository at this point in the history
```
arunkoshy@MystenLoshysMBP sui-benchmark % cargo run --package sui-benchmark --bin stress -- --log-path /tmp/stress.log --num-client-threads 10 --num-server-threads 24 --num-transfer-accounts 2 bench --target-qps 100 --num-workers 10  --shared-counter 100 --run-duration 30s --stress-stat-collection true

Benchmark Report:
+-------------+-----+--------+---------------+---------------+---------------+
| duration(s) | tps | error% | latency (min) | latency (p50) | latency (p99) |
+============================================================================+
| 30          | 98  | 0      | 91            | 456           | 778           |
+-------------+-----+--------+---------------+---------------+---------------+
Stress Performance Report:
+-----------+-----+-----+
| metric    | p50 | p99 |
+=======================+
| cpu usage | 39  | 64  |
+-----------+-----+-----+


arunkoshy@MystenLoshysMBP sui-benchmark % cargo run --package sui-benchmark --bin stress -- --log-path /tmp/stress.log --num-client-threads 10 --num-server-threads 12 --num-transfer-accounts 2 bench --target-qps 200 --num-workers 10  --shared-counter 200 --run-duration 30s --stress-stat-collection true
Benchmark Report:
+-------------+-----+--------+---------------+---------------+---------------+
| duration(s) | tps | error% | latency (min) | latency (p50) | latency (p99) |
+============================================================================+
| 30          | 196 | 0      | 196           | 519           | 870           |
+-------------+-----+--------+---------------+---------------+---------------+
Stress Performance Report:
+-----------+-----+-----+
| metric    | p50 | p99 |
+=======================+
| cpu usage | 77  | 97  |
+-----------+-----+-----+
```
  • Loading branch information
arun-koshy authored Jan 24, 2023
1 parent b370215 commit 15320d1
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 41 deletions.
29 changes: 28 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ move-core-types.workspace = true
narwhal-node = { path = "../../narwhal/node" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
test-utils = { path = "../test-utils" }
sysinfo = "0.27.5"

[target.'cfg(msim)'.dependencies]
sui-macros = { path = "../sui-macros" }
Expand Down
20 changes: 14 additions & 6 deletions crates/sui-benchmark/src/bin/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async fn main() -> Result<()> {
let cloned_barrier = barrier.clone();
let env = if opts.local { Env::Local } else { Env::Remote };
let benchmark_setup = env.setup(cloned_barrier, &registry, &opts).await?;
let stress_stat_collection = opts.stress_stat_collection;
barrier.wait().await;
// create client runtime
let client_runtime = Builder::new_multi_thread()
Expand Down Expand Up @@ -100,7 +101,7 @@ async fn main() -> Result<()> {
// otherwise summarized benchmark results are
// published in the end
let show_progress = interval.is_unbounded();
let driver = BenchDriver::new(opts.stat_collection_interval);
let driver = BenchDriver::new(opts.stat_collection_interval, stress_stat_collection);
driver
.run(
workloads,
Expand All @@ -116,15 +117,22 @@ async fn main() -> Result<()> {
if let Err(err) = joined {
Err(anyhow!("Failed to join client runtime: {:?}", err))
} else {
let stats: BenchmarkStats = joined.unwrap().unwrap();
let table = stats.to_table();
let (benchmark_stats, stress_stats) = joined.unwrap().unwrap();
let benchmark_table = benchmark_stats.to_table();
eprintln!("Benchmark Report:");
eprintln!("{}", table);
eprintln!("{}", benchmark_table);

if stress_stat_collection {
eprintln!("Stress Performance Report:");
let stress_stats_table = stress_stats.to_table();
eprintln!("{}", stress_stats_table);
}

if !prev_benchmark_stats_path.is_empty() {
let data = std::fs::read_to_string(&prev_benchmark_stats_path)?;
let prev_stats: BenchmarkStats = serde_json::from_str(&data)?;
let cmp = BenchmarkCmp {
new: &stats,
new: &benchmark_stats,
old: &prev_stats,
};
let cmp_table = cmp.to_table();
Expand All @@ -135,7 +143,7 @@ async fn main() -> Result<()> {
eprintln!("{}", cmp_table);
}
if !curr_benchmark_stats_path.is_empty() {
let serialized = serde_json::to_string(&stats)?;
let serialized = serde_json::to_string(&benchmark_stats)?;
std::fs::write(curr_benchmark_stats_path, serialized)?;
}
Ok(())
Expand Down
130 changes: 116 additions & 14 deletions crates/sui-benchmark/src/drivers/bench_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use prometheus::GaugeVec;
use prometheus::HistogramVec;
use prometheus::IntCounterVec;
use prometheus::Registry;
use tokio::sync::mpsc::Sender;
use tokio::sync::OnceCell;
use tokio_util::sync::CancellationToken;

Expand All @@ -29,13 +30,13 @@ use std::sync::Arc;
use std::time::Duration;
use sui_types::crypto::AuthorityStrongQuorumSignInfo;
use sui_types::messages::VerifiedTransaction;
use sysinfo::{CpuExt, System, SystemExt};
use tokio::sync::Barrier;
use tokio::time;
use tokio::time::Instant;
use tokio::{time, time::Instant};
use tracing::{debug, error, info};

use super::BenchmarkStats;
use super::Interval;
use super::{BenchmarkStats, StressStats};
pub struct BenchMetrics {
pub num_success: IntCounterVec,
pub num_error: IntCounterVec,
Expand All @@ -44,6 +45,7 @@ pub struct BenchMetrics {
pub latency_s: HistogramVec,
pub validators_in_tx_cert: IntCounterVec,
pub validators_in_effects_cert: IntCounterVec,
pub cpu_usage: GaugeVec,
}

const LATENCY_SEC_BUCKETS: &[f64] = &[
Expand Down Expand Up @@ -103,6 +105,13 @@ impl BenchMetrics {
registry,
)
.unwrap(),
cpu_usage: register_gauge_vec_with_registry!(
"cpu_usage",
"CPU usage per core",
&["cpu"],
registry,
)
.unwrap(),
}
}
}
Expand Down Expand Up @@ -137,14 +146,16 @@ pub struct BenchWorker {

pub struct BenchDriver {
pub stat_collection_interval: u64,
pub stress_stat_collection: bool,
pub start_time: Instant,
pub token: CancellationToken,
}

impl BenchDriver {
pub fn new(stat_collection_interval: u64) -> BenchDriver {
pub fn new(stat_collection_interval: u64, stress_stat_collection: bool) -> BenchDriver {
BenchDriver {
stat_collection_interval,
stress_stat_collection,
start_time: Instant::now(),
token: CancellationToken::new(),
}
Expand Down Expand Up @@ -223,18 +234,20 @@ async fn ctrl_c() -> std::io::Result<()> {
}

#[async_trait]
impl Driver<BenchmarkStats> for BenchDriver {
impl Driver<(BenchmarkStats, StressStats)> for BenchDriver {
async fn run(
&self,
workloads: Vec<WorkloadInfo>,
proxy: Arc<dyn ValidatorProxy + Sync + Send>,
registry: &Registry,
show_progress: bool,
run_duration: Interval,
) -> Result<BenchmarkStats, anyhow::Error> {
) -> Result<(BenchmarkStats, StressStats), anyhow::Error> {
info!("Running BenchDriver");

let mut tasks = Vec::new();
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
let (stress_stat_tx, mut stress_stat_rx) = tokio::sync::mpsc::channel(100);
let mut bench_workers = vec![];
for workload in workloads.iter() {
bench_workers.extend(self.make_workers(workload, proxy.clone()).await);
Expand Down Expand Up @@ -264,7 +277,7 @@ impl Driver<BenchmarkStats> for BenchDriver {
let cloned_token = self.token.clone();
let request_delay_micros = 1_000_000 / worker.target_qps;
let mut free_pool = worker.payload;
let progress = progress.clone();
let progress_cloned = progress.clone();
let tx_cloned = tx.clone();
let cloned_barrier = barrier.clone();
let metrics_cloned = metrics.clone();
Expand Down Expand Up @@ -415,8 +428,8 @@ impl Driver<BenchmarkStats> for BenchDriver {
match op {
NextOp::Retry(b) => {
retry_queue.push_back(b);
BenchDriver::update_progress(*start_time, run_duration, progress.clone());
if progress.is_finished() {
BenchDriver::update_progress(*start_time, run_duration, progress_cloned.clone());
if progress_cloned.is_finished() {
break;
}
}
Expand All @@ -425,8 +438,8 @@ impl Driver<BenchmarkStats> for BenchDriver {
num_in_flight -= 1;
free_pool.push(new_payload);
latency_histogram.saturating_record(latency.as_millis().try_into().unwrap());
BenchDriver::update_progress(*start_time, run_duration, progress.clone());
if progress.is_finished() {
BenchDriver::update_progress(*start_time, run_duration, progress_cloned.clone());
if progress_cloned.is_finished() {
break;
}
}
Expand Down Expand Up @@ -462,7 +475,7 @@ impl Driver<BenchmarkStats> for BenchDriver {
tasks.push(runner);
}

let stat_task = tokio::spawn(async move {
let benchmark_stat_task = tokio::spawn(async move {
let mut benchmark_stat = BenchmarkStats {
duration: Duration::ZERO,
num_error: 0,
Expand Down Expand Up @@ -492,6 +505,7 @@ impl Driver<BenchmarkStats> for BenchDriver {
let mut num_error: u64 = 0;
let mut latency_histogram =
hdrhistogram::Histogram::<u64>::new_with_max(120_000, 3).unwrap();

let mut num_in_flight: u64 = 0;
let mut num_submitted: u64 = 0;
let mut num_no_gas = 0;
Expand Down Expand Up @@ -524,6 +538,49 @@ impl Driver<BenchmarkStats> for BenchDriver {
benchmark_stat
});
drop(tx);

if self.stress_stat_collection {
tasks.push(stress_stats_collector(
progress.clone(),
metrics.clone(),
stress_stat_tx.clone(),
));
}
drop(stress_stat_tx);

let stress_stat_task = tokio::spawn(async move {
let mut stress_stat = StressStats {
cpu_usage: HistogramWrapper {
histogram: hdrhistogram::Histogram::<u64>::new_with_max(100, 3).unwrap(),
},
};
let mut stat_collection: Vec<StressStats> = Vec::new();
let mut counter = 0;
while let Some(sample_stat @ StressStats { cpu_usage: _ }) = stress_stat_rx.recv().await
{
stress_stat.update(&sample_stat);
stat_collection.push(sample_stat);

let mut cpu_usage_histogram =
hdrhistogram::Histogram::<u64>::new_with_max(100, 3).unwrap();
for stat in stat_collection.iter() {
cpu_usage_histogram.add(&stat.cpu_usage.histogram).unwrap();
}
counter += 1;
if counter % num_workers == 0 {
let stat = format!(
"cpu_usage p50 = {}, p99 = {}",
cpu_usage_histogram.value_at_quantile(0.5),
cpu_usage_histogram.value_at_quantile(0.99)
);
if show_progress {
eprintln!("{}", stat);
}
}
}
stress_stat
});

let all_tasks = try_join_all(tasks);
let _res = tokio::select! {
_ = ctrl_c() => {
Expand All @@ -532,7 +589,52 @@ impl Driver<BenchmarkStats> for BenchDriver {
}
res = all_tasks => res.unwrap().into_iter().collect()
};
let benchmark_stat = stat_task.await.unwrap();
Ok(benchmark_stat)
let benchmark_stat = benchmark_stat_task.await.unwrap();
let stress_stat = stress_stat_task.await.unwrap();
Ok((benchmark_stat, stress_stat))
}
}

fn stress_stats_collector(
progress: Arc<ProgressBar>,
metrics: Arc<BenchMetrics>,
stress_stat_tx: Sender<StressStats>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut system = System::new_all();

system.refresh_cpu();
tokio::time::sleep(Duration::from_secs(1)).await;

while !progress.is_finished() {
let mut cpu_usage_histogram =
hdrhistogram::Histogram::<u64>::new_with_max(100, 3).unwrap();
system.refresh_cpu();
for (i, cpu) in system.cpus().iter().enumerate() {
cpu_usage_histogram.saturating_record(cpu.cpu_usage() as u64);
metrics
.cpu_usage
.with_label_values(&[&format!("cpu_{i}").to_string()])
.set(cpu.cpu_usage().into());
}

if stress_stat_tx
.try_send(StressStats {
cpu_usage: HistogramWrapper {
histogram: cpu_usage_histogram,
},
})
.is_err()
{
debug!("Failed to update stress stats!");
}

tokio::select! {
_ = ctrl_c() => {
break;
},
_ = tokio::time::sleep(Duration::from_secs(1)) => (),
}
}
})
}
Loading

0 comments on commit 15320d1

Please sign in to comment.