Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry failed requests in benchmark #3119

Merged
merged 1 commit into from
Jul 12, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 106 additions & 127 deletions crates/sui-benchmark/src/bin/shared.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use clap::*;
use futures::FutureExt;

use crossterm::{cursor, terminal, QueueableCommand};
use futures::future::try_join_all;
use futures::future::{join_all, BoxFuture};
use futures::FutureExt;
use futures::{stream::FuturesUnordered, StreamExt};
use std::collections::BTreeMap;
use std::io::{stdout, Stdout, Write};
use std::time::Duration;
use sui_quorum_driver::QuorumDriverHandler;
use sui_types::base_types::{ObjectID, ObjectRef};
use sui_types::messages::{
ExecuteTransactionRequest, ExecuteTransactionRequestType, ExecuteTransactionResponse,
Transaction,
};
use sui_types::object::Owner;
use test_utils::authority::{
Expand All @@ -23,8 +21,10 @@ use test_utils::messages::{make_counter_create_transaction, make_counter_increme
use test_utils::objects::{generate_gas_object, generate_gas_objects};
use test_utils::transaction::publish_counter_package;
use tokio::time;
use tokio::time::{Instant, Interval};
use tracing::debug;
use tokio::time::Instant;
use tracing::subscriber::set_global_default;
use tracing::{debug, error};
use tracing_subscriber::EnvFilter;

#[derive(Parser)]
#[clap(name = "Shared Objects Benchmark")]
Expand All @@ -33,13 +33,13 @@ struct Opts {
#[clap(long, default_value = "4", global = true)]
pub committee_size: usize,
/// Target qps
#[clap(long, default_value = "500", global = true)]
#[clap(long, default_value = "100", global = true)]
pub target_qps: u64,
/// Number of workers
#[clap(long, default_value = "4", global = true)]
#[clap(long, default_value = "1", global = true)]
pub num_workers: u64,
/// Max in-flight ratio
#[clap(long, default_value = "2", global = true)]
#[clap(long, default_value = "5", global = true)]
pub in_flight_ratio: usize,
/// Stat collection interval seconds
#[clap(long, default_value = "10", global = true)]
Expand All @@ -62,26 +62,17 @@ type CounterAndGas = (ObjectID, (ObjectRef, Owner));

#[derive(Debug)]
enum NextOp {
Stat(Instant),
Request(Instant),
Response(Option<(Instant, CounterAndGas)>),
}

fn get_interval(start: Instant, delay_micros: u64) -> Interval {
time::interval_at(start, Duration::from_micros(delay_micros))
}

async fn get_next_tick(start: Instant, delay_micros: u64) -> Instant {
let interval = get_interval(start, delay_micros);
get_tick(interval).await
}

async fn get_tick(mut interval: Interval) -> Instant {
interval.tick().await
Retry(Box<(Transaction, ObjectID, Owner)>),
}

#[tokio::main]
async fn main() {
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("warn"));
let subscriber_builder =
tracing_subscriber::fmt::Subscriber::builder().with_env_filter(env_filter);
let subscriber = subscriber_builder.with_writer(std::io::stderr).finish();
set_global_default(subscriber).expect("Failed to set subscriber");
let opts: Opts = Opts::parse();

// This is the maximum number of increment counter ops in flight
Expand All @@ -103,21 +94,21 @@ async fn main() {
let clients = test_authority_aggregator(&configs);
let quorum_driver_handler = QuorumDriverHandler::new(clients);
let quorum_driver = quorum_driver_handler.clone_quorum_driver();
let mut stdout = stdout();

// publish package
write("Publishing basics package".to_string(), &mut stdout);
write("Publishing basics package".to_string());
let package_ref = publish_counter_package(publish_module_gas, configs.validator_set()).await;
gas.pop();

let qd_and_gas = gas
.into_iter()
.map(|g| (quorum_driver_handler.clone_quorum_driver(), g));

write(
"Creating shared counters, this may take a while..".to_string(),
&mut stdout,
);
write(format!(
"Number of shared counters: {}",
num_shared_counters
));
write("Creating shared counters, this may take a while..".to_string());
// create counters
let futures = qd_and_gas.map(|(qd, gas_object)| async move {
let tx =
Expand All @@ -138,14 +129,10 @@ async fn main() {
(counter_id, new_gas_ref)
});

let counter_and_gas: Vec<(ObjectID, (ObjectRef, Owner))> =
join_all(futures).await.into_iter().collect();
let counter_and_gas: Vec<CounterAndGas> = join_all(futures).await.into_iter().collect();

write(
format!("Done creating {} counters!", counter_and_gas.len()),
&mut stdout,
);
write("Starting benchmark!".to_string(), &mut stdout);
write(format!("Done creating {} counters!", counter_and_gas.len()));
write("Starting benchmark!".to_string());
let gas_per_worker = counter_and_gas.len() / opts.num_workers as usize;
let gas: Vec<Vec<(ObjectID, (ObjectRef, Owner))>> = counter_and_gas
.chunks(gas_per_worker)
Expand All @@ -157,30 +144,22 @@ async fn main() {
let mut free_pool = gas[i as usize].clone();
let qd = quorum_driver.clone();
let tx_cloned = tx.clone();
let task = tokio::spawn(async move {
let request_delay_micros = 1_000_000 / (opts.num_workers * opts.target_qps);
let stat_delay_micros = 1_000_000 * opts.stat_collection_interval;
let mut request_interval = time::interval(Duration::from_micros(request_delay_micros));
let mut stat_interval = time::interval(Duration::from_micros(stat_delay_micros));
let runner = tokio::spawn(async move {
let mut num_success = 0;
let mut num_error = 0;
let mut min_latency = Duration::MAX;
let mut max_latency = Duration::ZERO;
let mut num_no_gas = 0;
let mut num_in_flight: u64 = 0;
let mut num_submitted = 0;
let request_delay_micros = 1_000_000 / (opts.num_workers * opts.target_qps);
let stat_delay_micros = 1_000_000 * opts.stat_collection_interval;
let mut futures: FuturesUnordered<BoxFuture<NextOp>> = FuturesUnordered::new();
futures.push(Box::pin(
get_next_tick(Instant::now(), request_delay_micros).map(NextOp::Request),
));
futures.push(Box::pin(
get_next_tick(
Instant::now() + Duration::from_micros(stat_delay_micros),
stat_delay_micros,
)
.map(NextOp::Stat),
));
while let Some(op) = futures.next().await {
match op {
NextOp::Stat(_) => {
loop {
tokio::select! {
_ = stat_interval.tick() => {
if tx_cloned
.send(Stats {
id: i as usize,
Expand All @@ -191,106 +170,113 @@ async fn main() {
num_no_gas,
num_in_flight,
num_submitted,
duration: Duration::from_micros(
opts.stat_collection_interval * 1_000_000,
),
duration: Duration::from_micros(stat_delay_micros),
})
.await
.is_err()
{
debug!("Failed to update stat!");
}
//eprintln!("tick2!");
num_success = 0;
num_error = 0;
num_no_gas = 0;
num_submitted = 0;
min_latency = Duration::MAX;
max_latency = Duration::ZERO;
futures.push(Box::pin(
get_next_tick(
Instant::now() + Duration::from_micros(stat_delay_micros),
stat_delay_micros,
)
.map(NextOp::Stat),
));
}
NextOp::Request(now) => {
_ = request_interval.tick() => {
if free_pool.is_empty() {
num_no_gas += 1;
futures.push(Box::pin(
get_next_tick(
Instant::now() + Duration::from_micros(request_delay_micros),
request_delay_micros,
)
.map(NextOp::Request),
));
continue;
}
num_in_flight += 1;
num_submitted += 1;
let gas = free_pool.pop().unwrap();
let counter_id = gas.0;
let owner = gas.1 .1;
let tx =
make_counter_increment_transaction(gas.1 .0, package_ref, counter_id);
let res = qd
.execute_transaction(ExecuteTransactionRequest {
transaction: tx,
} else {
num_in_flight += 1;
num_submitted += 1;
let gas = free_pool.pop().unwrap();
let counter_id = gas.0;
let owner = gas.1 .1;
let tx = make_counter_increment_transaction(gas.1 .0, package_ref, counter_id);
let start = Instant::now();
let res = qd
.execute_transaction(ExecuteTransactionRequest {
transaction: tx.clone(),
request_type: ExecuteTransactionRequestType::WaitForEffectsCert,
})
.map(move |res| {
match res {
Ok(ExecuteTransactionResponse::EffectsCert(result)) => {
let (_, effects) = *result;
NextOp::Response(Some((
now,
start,
(counter_id, (effects.effects.gas_object.0, owner)),
)))
}
Err(_sui_err) => {
println!("{}", _sui_err);
// TODO (GAS-LEAK): How do we add this gas back in the pool?
NextOp::Response(None)
Ok(resp) => {
error!("unexpected_response: {:?}", resp);
NextOp::Retry(Box::new((tx, counter_id, owner)))
}
_ => {
// eprintln!("error");
// TODO (GAS-LEAK): How do we add this gas back in the pool?
NextOp::Response(None)
Err(sui_err) => {
error!("{}", sui_err);
NextOp::Retry(Box::new((tx, counter_id, owner)))
}
}
});
futures.push(Box::pin(res));
futures.push(Box::pin(
get_next_tick(
Instant::now() + Duration::from_micros(request_delay_micros),
request_delay_micros,
)
.map(NextOp::Request),
));
}
NextOp::Response(Some((start, payload))) => {
free_pool.push(payload);
let latency = start.elapsed();
num_success += 1;
num_in_flight -= 1;
if latency > max_latency {
max_latency = latency;
futures.push(Box::pin(res));
}
if latency < min_latency {
min_latency = latency;
}
//eprintln!("{}",free_pool.len());
}
NextOp::Response(None) => {
num_in_flight -= 1;
num_error += 1;
Some(op) = futures.next() => {
match op {
NextOp::Retry(b) => {
num_submitted += 1;
num_error += 1;
let res = qd
.execute_transaction(ExecuteTransactionRequest {
transaction: b.0.clone(),
request_type: ExecuteTransactionRequestType::WaitForEffectsCert,
})
.map(move |res| {
match res {
Ok(ExecuteTransactionResponse::EffectsCert(result)) => {
let (_, effects) = *result;
NextOp::Response(Some((
Instant::now(),
(b.1, (effects.effects.gas_object.0, b.2)),
)))
}
Ok(resp) => {
error!("unexpected_response: {:?}", resp);
NextOp::Retry(b)
}
Err(sui_err) => {
error!("{}", sui_err);
NextOp::Retry(b)
}
}
});
futures.push(Box::pin(res));
}
NextOp::Response(Some((start, payload))) => {
free_pool.push(payload);
let latency = start.elapsed();
num_success += 1;
num_in_flight -= 1;
if latency > max_latency {
max_latency = latency;
}
if latency < min_latency {
min_latency = latency;
}
}
NextOp::Response(None) => {
num_in_flight -= 1;
}
}
}

}
}
});
tasks.push(task);
tasks.push(runner);
});

tasks.push(tokio::spawn(async move {
let mut stat_collection: BTreeMap<usize, Stats> = BTreeMap::new();
while let Some(s @ Stats {
Expand Down Expand Up @@ -337,19 +323,12 @@ async fn main() {
} else {
0.0
};
write(format!("Throughput = {}, min_latency_ms = {}, max_latency_ms = {}, num_success = {}, num_error = {}, no_gas = {}, submitted = {}, in_flight = {}", total_qps, min_latency.as_millis(), max_latency.as_millis(), num_success, num_error, num_no_gas, num_submitted, num_in_flight), &mut stdout);
write(format!("Throughput = {}, min_latency_ms = {}, max_latency_ms = {}, num_success = {}, num_error = {}, no_gas = {}, submitted = {}, in_flight = {}", total_qps, min_latency.as_millis(), max_latency.as_millis(), num_success, num_error, num_no_gas, num_submitted, num_in_flight));
}
}));
let _: Vec<_> = try_join_all(tasks).await.unwrap().into_iter().collect();
}

fn write(str: String, stdout: &mut Stdout) {
stdout.queue(cursor::SavePosition).unwrap();
stdout.write_all(str.as_bytes()).unwrap();
stdout.queue(cursor::RestorePosition).unwrap();
stdout.flush().unwrap();
stdout.queue(cursor::RestorePosition).unwrap();
stdout
.queue(terminal::Clear(terminal::ClearType::FromCursorDown))
.unwrap();
fn write(str: String) {
eprintln!("{}", str);
}