Skip to content

Commit

Permalink
Retry failed requests in benchmark (MystenLabs#3119)
Browse files Browse the repository at this point in the history
  • Loading branch information
sadhansood authored Jul 12, 2022
1 parent 79eed14 commit 6dfe094
Showing 1 changed file with 106 additions and 127 deletions.
233 changes: 106 additions & 127 deletions crates/sui-benchmark/src/bin/shared.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use clap::*;
use futures::FutureExt;

use crossterm::{cursor, terminal, QueueableCommand};
use futures::future::try_join_all;
use futures::future::{join_all, BoxFuture};
use futures::FutureExt;
use futures::{stream::FuturesUnordered, StreamExt};
use std::collections::BTreeMap;
use std::io::{stdout, Stdout, Write};
use std::time::Duration;
use sui_quorum_driver::QuorumDriverHandler;
use sui_types::base_types::{ObjectID, ObjectRef};
use sui_types::messages::{
ExecuteTransactionRequest, ExecuteTransactionRequestType, ExecuteTransactionResponse,
Transaction,
};
use sui_types::object::Owner;
use test_utils::authority::{
Expand All @@ -23,8 +21,10 @@ use test_utils::messages::{make_counter_create_transaction, make_counter_increme
use test_utils::objects::{generate_gas_object, generate_gas_objects};
use test_utils::transaction::publish_counter_package;
use tokio::time;
use tokio::time::{Instant, Interval};
use tracing::debug;
use tokio::time::Instant;
use tracing::subscriber::set_global_default;
use tracing::{debug, error};
use tracing_subscriber::EnvFilter;

#[derive(Parser)]
#[clap(name = "Shared Objects Benchmark")]
Expand All @@ -33,13 +33,13 @@ struct Opts {
#[clap(long, default_value = "4", global = true)]
pub committee_size: usize,
/// Target qps
#[clap(long, default_value = "500", global = true)]
#[clap(long, default_value = "100", global = true)]
pub target_qps: u64,
/// Number of workers
#[clap(long, default_value = "4", global = true)]
#[clap(long, default_value = "1", global = true)]
pub num_workers: u64,
/// Max in-flight ratio
#[clap(long, default_value = "2", global = true)]
#[clap(long, default_value = "5", global = true)]
pub in_flight_ratio: usize,
/// Stat collection interval seconds
#[clap(long, default_value = "10", global = true)]
Expand All @@ -62,26 +62,17 @@ type CounterAndGas = (ObjectID, (ObjectRef, Owner));

#[derive(Debug)]
enum NextOp {
Stat(Instant),
Request(Instant),
Response(Option<(Instant, CounterAndGas)>),
}

fn get_interval(start: Instant, delay_micros: u64) -> Interval {
time::interval_at(start, Duration::from_micros(delay_micros))
}

async fn get_next_tick(start: Instant, delay_micros: u64) -> Instant {
let interval = get_interval(start, delay_micros);
get_tick(interval).await
}

async fn get_tick(mut interval: Interval) -> Instant {
interval.tick().await
Retry(Box<(Transaction, ObjectID, Owner)>),
}

#[tokio::main]
async fn main() {
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("warn"));
let subscriber_builder =
tracing_subscriber::fmt::Subscriber::builder().with_env_filter(env_filter);
let subscriber = subscriber_builder.with_writer(std::io::stderr).finish();
set_global_default(subscriber).expect("Failed to set subscriber");
let opts: Opts = Opts::parse();

// This is the maximum number of increment counter ops in flight
Expand All @@ -103,21 +94,21 @@ async fn main() {
let clients = test_authority_aggregator(&configs);
let quorum_driver_handler = QuorumDriverHandler::new(clients);
let quorum_driver = quorum_driver_handler.clone_quorum_driver();
let mut stdout = stdout();

// publish package
write("Publishing basics package".to_string(), &mut stdout);
write("Publishing basics package".to_string());
let package_ref = publish_counter_package(publish_module_gas, configs.validator_set()).await;
gas.pop();

let qd_and_gas = gas
.into_iter()
.map(|g| (quorum_driver_handler.clone_quorum_driver(), g));

write(
"Creating shared counters, this may take a while..".to_string(),
&mut stdout,
);
write(format!(
"Number of shared counters: {}",
num_shared_counters
));
write("Creating shared counters, this may take a while..".to_string());
// create counters
let futures = qd_and_gas.map(|(qd, gas_object)| async move {
let tx =
Expand All @@ -138,14 +129,10 @@ async fn main() {
(counter_id, new_gas_ref)
});

let counter_and_gas: Vec<(ObjectID, (ObjectRef, Owner))> =
join_all(futures).await.into_iter().collect();
let counter_and_gas: Vec<CounterAndGas> = join_all(futures).await.into_iter().collect();

write(
format!("Done creating {} counters!", counter_and_gas.len()),
&mut stdout,
);
write("Starting benchmark!".to_string(), &mut stdout);
write(format!("Done creating {} counters!", counter_and_gas.len()));
write("Starting benchmark!".to_string());
let gas_per_worker = counter_and_gas.len() / opts.num_workers as usize;
let gas: Vec<Vec<(ObjectID, (ObjectRef, Owner))>> = counter_and_gas
.chunks(gas_per_worker)
Expand All @@ -157,30 +144,22 @@ async fn main() {
let mut free_pool = gas[i as usize].clone();
let qd = quorum_driver.clone();
let tx_cloned = tx.clone();
let task = tokio::spawn(async move {
let request_delay_micros = 1_000_000 / (opts.num_workers * opts.target_qps);
let stat_delay_micros = 1_000_000 * opts.stat_collection_interval;
let mut request_interval = time::interval(Duration::from_micros(request_delay_micros));
let mut stat_interval = time::interval(Duration::from_micros(stat_delay_micros));
let runner = tokio::spawn(async move {
let mut num_success = 0;
let mut num_error = 0;
let mut min_latency = Duration::MAX;
let mut max_latency = Duration::ZERO;
let mut num_no_gas = 0;
let mut num_in_flight: u64 = 0;
let mut num_submitted = 0;
let request_delay_micros = 1_000_000 / (opts.num_workers * opts.target_qps);
let stat_delay_micros = 1_000_000 * opts.stat_collection_interval;
let mut futures: FuturesUnordered<BoxFuture<NextOp>> = FuturesUnordered::new();
futures.push(Box::pin(
get_next_tick(Instant::now(), request_delay_micros).map(NextOp::Request),
));
futures.push(Box::pin(
get_next_tick(
Instant::now() + Duration::from_micros(stat_delay_micros),
stat_delay_micros,
)
.map(NextOp::Stat),
));
while let Some(op) = futures.next().await {
match op {
NextOp::Stat(_) => {
loop {
tokio::select! {
_ = stat_interval.tick() => {
if tx_cloned
.send(Stats {
id: i as usize,
Expand All @@ -191,106 +170,113 @@ async fn main() {
num_no_gas,
num_in_flight,
num_submitted,
duration: Duration::from_micros(
opts.stat_collection_interval * 1_000_000,
),
duration: Duration::from_micros(stat_delay_micros),
})
.await
.is_err()
{
debug!("Failed to update stat!");
}
//eprintln!("tick2!");
num_success = 0;
num_error = 0;
num_no_gas = 0;
num_submitted = 0;
min_latency = Duration::MAX;
max_latency = Duration::ZERO;
futures.push(Box::pin(
get_next_tick(
Instant::now() + Duration::from_micros(stat_delay_micros),
stat_delay_micros,
)
.map(NextOp::Stat),
));
}
NextOp::Request(now) => {
_ = request_interval.tick() => {
if free_pool.is_empty() {
num_no_gas += 1;
futures.push(Box::pin(
get_next_tick(
Instant::now() + Duration::from_micros(request_delay_micros),
request_delay_micros,
)
.map(NextOp::Request),
));
continue;
}
num_in_flight += 1;
num_submitted += 1;
let gas = free_pool.pop().unwrap();
let counter_id = gas.0;
let owner = gas.1 .1;
let tx =
make_counter_increment_transaction(gas.1 .0, package_ref, counter_id);
let res = qd
.execute_transaction(ExecuteTransactionRequest {
transaction: tx,
} else {
num_in_flight += 1;
num_submitted += 1;
let gas = free_pool.pop().unwrap();
let counter_id = gas.0;
let owner = gas.1 .1;
let tx = make_counter_increment_transaction(gas.1 .0, package_ref, counter_id);
let start = Instant::now();
let res = qd
.execute_transaction(ExecuteTransactionRequest {
transaction: tx.clone(),
request_type: ExecuteTransactionRequestType::WaitForEffectsCert,
})
.map(move |res| {
match res {
Ok(ExecuteTransactionResponse::EffectsCert(result)) => {
let (_, effects) = *result;
NextOp::Response(Some((
now,
start,
(counter_id, (effects.effects.gas_object.0, owner)),
)))
}
Err(_sui_err) => {
println!("{}", _sui_err);
// TODO (GAS-LEAK): How do we add this gas back in the pool?
NextOp::Response(None)
Ok(resp) => {
error!("unexpected_response: {:?}", resp);
NextOp::Retry(Box::new((tx, counter_id, owner)))
}
_ => {
// eprintln!("error");
// TODO (GAS-LEAK): How do we add this gas back in the pool?
NextOp::Response(None)
Err(sui_err) => {
error!("{}", sui_err);
NextOp::Retry(Box::new((tx, counter_id, owner)))
}
}
});
futures.push(Box::pin(res));
futures.push(Box::pin(
get_next_tick(
Instant::now() + Duration::from_micros(request_delay_micros),
request_delay_micros,
)
.map(NextOp::Request),
));
}
NextOp::Response(Some((start, payload))) => {
free_pool.push(payload);
let latency = start.elapsed();
num_success += 1;
num_in_flight -= 1;
if latency > max_latency {
max_latency = latency;
futures.push(Box::pin(res));
}
if latency < min_latency {
min_latency = latency;
}
//eprintln!("{}",free_pool.len());
}
NextOp::Response(None) => {
num_in_flight -= 1;
num_error += 1;
Some(op) = futures.next() => {
match op {
NextOp::Retry(b) => {
num_submitted += 1;
num_error += 1;
let res = qd
.execute_transaction(ExecuteTransactionRequest {
transaction: b.0.clone(),
request_type: ExecuteTransactionRequestType::WaitForEffectsCert,
})
.map(move |res| {
match res {
Ok(ExecuteTransactionResponse::EffectsCert(result)) => {
let (_, effects) = *result;
NextOp::Response(Some((
Instant::now(),
(b.1, (effects.effects.gas_object.0, b.2)),
)))
}
Ok(resp) => {
error!("unexpected_response: {:?}", resp);
NextOp::Retry(b)
}
Err(sui_err) => {
error!("{}", sui_err);
NextOp::Retry(b)
}
}
});
futures.push(Box::pin(res));
}
NextOp::Response(Some((start, payload))) => {
free_pool.push(payload);
let latency = start.elapsed();
num_success += 1;
num_in_flight -= 1;
if latency > max_latency {
max_latency = latency;
}
if latency < min_latency {
min_latency = latency;
}
}
NextOp::Response(None) => {
num_in_flight -= 1;
}
}
}

}
}
});
tasks.push(task);
tasks.push(runner);
});

tasks.push(tokio::spawn(async move {
let mut stat_collection: BTreeMap<usize, Stats> = BTreeMap::new();
while let Some(s @ Stats {
Expand Down Expand Up @@ -337,19 +323,12 @@ async fn main() {
} else {
0.0
};
write(format!("Throughput = {}, min_latency_ms = {}, max_latency_ms = {}, num_success = {}, num_error = {}, no_gas = {}, submitted = {}, in_flight = {}", total_qps, min_latency.as_millis(), max_latency.as_millis(), num_success, num_error, num_no_gas, num_submitted, num_in_flight), &mut stdout);
write(format!("Throughput = {}, min_latency_ms = {}, max_latency_ms = {}, num_success = {}, num_error = {}, no_gas = {}, submitted = {}, in_flight = {}", total_qps, min_latency.as_millis(), max_latency.as_millis(), num_success, num_error, num_no_gas, num_submitted, num_in_flight));
}
}));
let _: Vec<_> = try_join_all(tasks).await.unwrap().into_iter().collect();
}

fn write(str: String, stdout: &mut Stdout) {
stdout.queue(cursor::SavePosition).unwrap();
stdout.write_all(str.as_bytes()).unwrap();
stdout.queue(cursor::RestorePosition).unwrap();
stdout.flush().unwrap();
stdout.queue(cursor::RestorePosition).unwrap();
stdout
.queue(terminal::Clear(terminal::ClearType::FromCursorDown))
.unwrap();
fn write(str: String) {
eprintln!("{}", str);
}

0 comments on commit 6dfe094

Please sign in to comment.