Skip to content

Commit

Permalink
Enable running benchmark on remote validators (MystenLabs#3474)
Browse files Browse the repository at this point in the history
  • Loading branch information
sadhansood authored Jul 28, 2022
1 parent 641cda0 commit 5fd7100
Show file tree
Hide file tree
Showing 7 changed files with 473 additions and 184 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/sui-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ bcs = "0.1.3"
sui-core = { path = "../sui-core" }
sui-config = { path = "../sui-config" }
sui-types = { path = "../sui-types" }
sui-gateway = { path = "../sui-gateway" }
sui-sdk = { path = "../sui-sdk" }

move-core-types = { git = "https://github.com/move-language/move", rev = "79071528524f08b12e9abb84c1094d8e976aa17a", features = ["address20"] }
narwhal-node = { git = "https://github.com/MystenLabs/narwhal", rev = "50411aa4b8b6eac7e45fa0e0da4ad8fc6c20395e", package = "node" }
Expand Down
233 changes: 173 additions & 60 deletions crates/sui-benchmark/src/bin/stress.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,43 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use anyhow::{anyhow, Result};
use clap::*;
use futures::future::try_join_all;
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::{stream::FuturesUnordered, StreamExt};
use std::collections::{BTreeMap, VecDeque};
use std::path::PathBuf;
use std::sync::{Arc, Barrier};
use std::thread::JoinHandle;
use std::time::Duration;
use strum_macros::EnumString;
use sui_benchmark::stress::context::get_latest;
use sui_benchmark::stress::context::Payload;
use sui_benchmark::stress::context::StressTestCtx;
use sui_benchmark::stress::shared_counter::SharedCounterTestCtx;
use sui_benchmark::stress::transfer_object::TransferObjectTestCtx;
use sui_config::NetworkConfig;
use sui_config::Config;
use sui_config::PersistedConfig;
use sui_core::authority_aggregator::AuthAggMetrics;
use sui_core::authority_aggregator::AuthorityAggregator;
use sui_core::authority_client::NetworkAuthorityClient;
use sui_gateway::config::GatewayConfig;
use sui_node::SuiNode;
use sui_quorum_driver::QuorumDriverHandler;
use sui_types::crypto::EmptySignInfo;
use sui_sdk::crypto::SuiKeystore;
use sui_types::base_types::ObjectID;
use sui_types::base_types::SuiAddress;
use sui_types::crypto::EncodeDecodeBase64;
use sui_types::crypto::KeypairTraits;
use sui_types::crypto::{AccountKeyPair, EmptySignInfo};
use sui_types::messages::{
ExecuteTransactionRequest, ExecuteTransactionRequestType, ExecuteTransactionResponse,
Transaction, TransactionEnvelope,
};
use test_utils::authority::{
spawn_test_authorities, test_and_configure_authority_configs, test_authority_aggregator,
};
use test_utils::authority::{spawn_test_authorities, test_and_configure_authority_configs};
use test_utils::objects::generate_gas_objects_with_owner;
use test_utils::test_account_keys;
use tokio::runtime::Builder;
use tokio::time;
use tokio::time::Instant;
Expand All @@ -34,7 +46,7 @@ use tracing::{debug, error};
#[derive(Parser)]
#[clap(name = "Stress Testing Framework")]
struct Opts {
/// Size of the Sui committee.
/// Si&ze of the Sui committee.
#[clap(long, default_value = "4", global = true)]
pub committee_size: u64,
/// Target qps
Expand All @@ -44,7 +56,7 @@ struct Opts {
#[clap(long, default_value = "12", global = true)]
pub num_workers: u64,
/// Max in-flight ratio
#[clap(long, default_value = "10", global = true)]
#[clap(long, default_value = "5", global = true)]
pub in_flight_ratio: u64,
/// Num of accounts to use for transfer objects
#[clap(long, default_value = "5", global = true)]
Expand All @@ -57,11 +69,30 @@ struct Opts {
pub transaction_type: TransactionType,
/// Num server threads
#[clap(long, default_value = "24", global = true)]
pub num_server_threads: usize,
pub num_server_threads: u64,
/// Num client threads
/// ideally same as number of workers
#[clap(long, default_value = "3", global = true)]
pub num_client_threads: usize,
pub num_client_threads: u64,
/// Path where gateway config is stored when running remote benchmark
/// This is also the path where gateway config is stored during local
/// benchmark
#[clap(long, default_value = "/tmp/gateway.yaml", global = true)]
pub gateway_config_path: String,
/// Path where keypair for primary gas account is stored. The format of
/// this file is same as what `sui keytool generate` outputs
#[clap(long, default_value = "", global = true)]
pub keystore_path: String,
/// Object id of the primary gas coin used for benchmark
/// NOTE: THe remote network should have this coin in its genesis config
/// with large enough gas i.e. u64::MAX
#[clap(long, default_value = "", global = true)]
pub primary_gas_id: String,
/// Whether to run local or remote benchmark
/// NOTE: For running remote benchmark we must have the following
/// gateway_config_path, keypair_path and primary_gas_id
#[clap(long, parse(try_from_str), default_value = "true", global = true)]
pub local: bool,
}

struct Stats {
Expand Down Expand Up @@ -320,23 +351,31 @@ async fn run(
}

fn make_test_ctx(
primary_gas_id: ObjectID,
primary_gas_account_owner: SuiAddress,
primary_gas_account_keypair: AccountKeyPair,
max_in_flight_ops: usize,
configs: &NetworkConfig,
opts: &Opts,
) -> Box<dyn StressTestCtx<dyn Payload>> {
match opts.transaction_type {
TransactionType::SharedCounter => {
SharedCounterTestCtx::make_ctx(max_in_flight_ops as u64, configs)
}
TransactionType::SharedCounter => SharedCounterTestCtx::make_ctx(
max_in_flight_ops as u64,
primary_gas_id,
primary_gas_account_owner,
primary_gas_account_keypair,
),
TransactionType::TransferObject => TransferObjectTestCtx::make_ctx(
max_in_flight_ops as u64,
opts.num_transfer_accounts,
configs,
primary_gas_id,
primary_gas_account_owner,
primary_gas_account_keypair,
),
}
}

fn main() {
#[tokio::main]
async fn main() -> Result<()> {
let mut config = telemetry_subscribers::TelemetryConfig::new("stress");
config.log_string = Some("warn".to_string());
config.log_file = Some("/tmp/stress.log".to_string());
Expand All @@ -346,60 +385,134 @@ fn main() {
// This is the maximum number of increment counter ops in flight
let max_in_flight_ops = opts.target_qps as usize * opts.in_flight_ratio as usize;

let configs = {
let mut configs = test_and_configure_authority_configs(opts.committee_size as usize);
configs.validator_configs.iter_mut().for_each(|config| {
let parameters = &mut config.consensus_config.as_mut().unwrap().narwhal_config;
parameters.batch_size = 12800;
});
Arc::new(configs)
};

let mut ctx = make_test_ctx(max_in_flight_ops, &configs, &opts);

let genesis_objects = ctx.get_gas_objects();

// Make the client runtime wait until we are done creating genesis objects
let barrier = Arc::new(Barrier::new(2));
let cloned_barrier = barrier.clone();
let cloned_config = configs.clone();
// spawn a thread to spin up sui nodes on the multi-threaded server runtime
let _ = std::thread::spawn(move || {
// create server runtime
let server_runtime = Builder::new_multi_thread()
.thread_stack_size(32 * 1024 * 1024)
.worker_threads(opts.num_server_threads)
.enable_all()
.build()
.unwrap();
server_runtime.block_on(async move {
// Setup the network
let nodes: Vec<SuiNode> =
spawn_test_authorities(genesis_objects.clone(), &cloned_config).await;
let handles: Vec<_> = nodes.into_iter().map(move |node| node.wait()).collect();
cloned_barrier.wait();
if try_join_all(handles).await.is_err() {
error!("Failed while waiting for nodes");
}
let (primary_gas_id, owner, keypair, gateway_config) = if opts.local {
eprintln!("Configuring local benchmark..");
let configs = {
let mut configs = test_and_configure_authority_configs(opts.committee_size as usize);
configs.validator_configs.iter_mut().for_each(|config| {
let parameters = &mut config.consensus_config.as_mut().unwrap().narwhal_config;
parameters.batch_size = 12800;
});
Arc::new(configs)
};
let gateway_config = GatewayConfig {
epoch: 0,
validator_set: configs.validator_set().to_vec(),
send_timeout: Duration::from_secs(4),
recv_timeout: Duration::from_secs(4),
buffer_size: 650000,
db_folder_path: PathBuf::from("/tmp/client_db"),
};
gateway_config.save(&opts.gateway_config_path)?;
// bring up servers ..
let (owner, keypair): (SuiAddress, AccountKeyPair) = test_account_keys().pop().unwrap();
let primary_gas = generate_gas_objects_with_owner(1, owner);
let primary_gas_id = primary_gas.get(0).unwrap().id();
// Make the client runtime wait until we are done creating genesis objects
let cloned_config = configs;
let cloned_gas = primary_gas;
// spawn a thread to spin up sui nodes on the multi-threaded server runtime
let _ = std::thread::spawn(move || {
// create server runtime
let server_runtime = Builder::new_multi_thread()
.thread_stack_size(32 * 1024 * 1024)
.worker_threads(opts.num_server_threads as usize)
.enable_all()
.build()
.unwrap();
server_runtime.block_on(async move {
// Setup the network
let nodes: Vec<SuiNode> = spawn_test_authorities(cloned_gas, &cloned_config).await;
let handles: Vec<_> = nodes.into_iter().map(move |node| node.wait()).collect();
cloned_barrier.wait();
if try_join_all(handles).await.is_err() {
error!("Failed while waiting for nodes");
}
});
});
});

(primary_gas_id, owner, keypair, gateway_config)
} else {
eprintln!("Configuring remote benchmark..");
cloned_barrier.wait();
let config_path = Some(&opts.gateway_config_path)
.filter(|s| !s.is_empty())
.map(PathBuf::from)
.ok_or_else(|| {
anyhow!(format!(
"Failed to find gateway config at path: {}",
opts.gateway_config_path
))
})?;
let config: GatewayConfig = PersistedConfig::read(&config_path)?;
let committee = config.make_committee()?;
let authority_clients = config.make_authority_clients();
let metrics = AuthAggMetrics::new(&prometheus::Registry::new());
let aggregator = AuthorityAggregator::new(committee, authority_clients, metrics);
let primary_gas_id = ObjectID::from_hex_literal(&opts.primary_gas_id)?;
let primary_gas = get_latest(primary_gas_id, &aggregator)
.await
.ok_or_else(|| {
anyhow!(format!(
"Failed to read primary gas object with id: {}",
primary_gas_id
))
})?;
let primary_gas_account = primary_gas.owner.get_owner_address()?;
let keystore_path = Some(&opts.keystore_path)
.filter(|s| !s.is_empty())
.map(PathBuf::from)
.ok_or_else(|| {
anyhow!(format!(
"Failed to find keypair at path: {}",
&opts.keystore_path
))
})?;
let keystore = SuiKeystore::load_or_create(&keystore_path)?;
let keypair = keystore
.key_pairs()
.iter()
.find(|x| {
let address: SuiAddress = Into::<SuiAddress>::into(x.public());
address == primary_gas_account
})
.map(|x| x.encode_base64())
.unwrap();
//let contents = std::fs::read_to_string(keypair.encode_base64())?;
(
primary_gas_id,
primary_gas_account,
keypair.parse()?,
config,
)
};
barrier.wait();
let ctx = make_test_ctx(primary_gas_id, owner, keypair, max_in_flight_ops, &opts);
// create client runtime
let client_runtime = Builder::new_multi_thread()
.enable_all()
.thread_stack_size(32 * 1024 * 1024)
.worker_threads(opts.num_client_threads)
.worker_threads(opts.num_client_threads as usize)
.build()
.unwrap();
client_runtime.block_on(async move {
let mut payloads = ctx.make_test_payloads(&configs).await;
let clients = test_authority_aggregator(&configs);
let mut p: Vec<Arc<dyn Payload>> = vec![];
while !payloads.is_empty() {
let entry: Box<dyn Payload> = payloads.pop().unwrap();
p.push(Arc::from(entry));
}
run(clients, p, opts).await
let handle: JoinHandle<()> = std::thread::spawn(move || {
client_runtime.block_on(async move {
let committee = gateway_config.make_committee().unwrap();
let authority_clients = gateway_config.make_authority_clients();
let metrics = AuthAggMetrics::new(&prometheus::Registry::new());
let aggregator = AuthorityAggregator::new(committee, authority_clients, metrics);
let mut payloads = ctx.make_test_payloads(&aggregator).await;
let mut p: Vec<Arc<dyn Payload>> = vec![];
while !payloads.is_empty() {
let entry: Box<dyn Payload> = payloads.pop().unwrap();
p.push(Arc::from(entry));
}
run(aggregator, p, opts).await
});
});
if handle.join().is_err() {
error!("Failed to join thread");
}
Ok(())
}
Loading

0 comments on commit 5fd7100

Please sign in to comment.