Skip to content

Commit

Permalink
[rpc-loadgen] support concurrent execution on the same thread (Mysten…
Browse files Browse the repository at this point in the history
…Labs#9904)

## Description 

Follow-up to MystenLabs#9899. 

## Test Plan 

Tested against CI fullnode and was able to reach ~300TPS.

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
666lcz authored Mar 27, 2023
1 parent 20a01cb commit 46b3844
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 61 deletions.
2 changes: 2 additions & 0 deletions crates/sui-rpc-loadgen/src/load_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct LoadTestConfig {
/// should divide tasks across multiple threads
pub divide_tasks: bool,
pub signer_info: Option<SignerInfo>,
pub num_chunks_per_thread: usize,
pub max_repeat: usize,
}

pub(crate) struct LoadTest<R: Processor + Send + Sync + Clone> {
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-rpc-loadgen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pub struct CommonOptions {

#[clap(short, long, default_value_t = 0)]
pub interval_in_ms: u64,

/// different chunks will be executed concurrently on the same thread
#[clap(long, default_value_t = 1)]
num_chunks_per_thread: usize,
}

#[derive(Parser)]
Expand Down Expand Up @@ -183,6 +187,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
// TODO: pass in from config
divide_tasks: true,
signer_info,
num_chunks_per_thread: common.num_chunks_per_thread,
max_repeat: common.repeat,
},
};
load_test.run().await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-rpc-loadgen/src/payload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use sui_types::base_types::{ObjectID, SuiAddress};
pub struct SignerInfo {
pub encoded_keypair: String,
/// Different thread should use different gas_payment to avoid equivocation
pub gas_payment: Option<ObjectID>,
pub gas_payment: Option<Vec<ObjectID>>,
pub gas_budget: Option<u64>,
}

Expand Down
67 changes: 32 additions & 35 deletions crates/sui-rpc-loadgen/src/payload/pay_sui.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::payload::rpc_command_processor::DEFAULT_GAS_BUDGET;
use crate::payload::rpc_command_processor::{sign_and_execute, DEFAULT_GAS_BUDGET};
use crate::payload::{PaySui, ProcessPayload, RpcCommandProcessor, SignerInfo};
use async_trait::async_trait;
use shared_crypto::intent::{Intent, IntentMessage};
use sui_json_rpc_types::SuiTransactionResponseOptions;
use sui_types::base_types::SuiAddress;
use sui_types::crypto::{EncodeDecodeBase64, Signature, SuiKeyPair};
use sui_types::messages::{ExecuteTransactionRequestType, Transaction};
use futures::future::join_all;
use sui_json_rpc_types::SuiTransactionResponse;
use sui_sdk::SuiClient;
use sui_types::base_types::{ObjectID, SuiAddress};
use sui_types::crypto::{EncodeDecodeBase64, SuiKeyPair};
use tracing::debug;

#[async_trait]
Expand All @@ -27,42 +27,39 @@ impl<'a> ProcessPayload<'a, &'a PaySui> for RpcCommandProcessor {
let recipient = SuiAddress::random_for_testing_only();
let amount = 1;
let gas_budget = gas_budget.unwrap_or(DEFAULT_GAS_BUDGET);
let gas_payments = gas_payment.unwrap();

let keypair =
SuiKeyPair::decode_base64(&encoded_keypair).expect("Decoding keypair should not fail");
let signer_address = SuiAddress::from(&keypair.public());

debug!("Pay Sui to {recipient} with {amount} MIST with {gas_payment:?}");
debug!(
"Transfer Sui {} time to {recipient} with {amount} MIST with {gas_payments:?}",
gas_payments.len()
);
for client in clients.iter() {
let transfer_tx = client
.transaction_builder()
.transfer_sui(
signer_address,
gas_payment.unwrap(),
gas_budget,
recipient,
Some(amount),
)
.await?;
debug!("transfer_tx {:?}", transfer_tx);
let signature = Signature::new_secure(
&IntentMessage::new(Intent::default(), &transfer_tx),
&keypair,
);

let transaction_response = client
.quorum_driver()
.execute_transaction(
Transaction::from_data(transfer_tx, Intent::default(), vec![signature])
.verify()?,
SuiTransactionResponseOptions::full_content(),
Some(ExecuteTransactionRequestType::WaitForLocalExecution),
)
.await?;

debug!("transaction_response {transaction_response:?}");
join_all(gas_payments.iter().map(|gas| async {
transfer_sui(client, &keypair, *gas, gas_budget, recipient, amount).await;
}))
.await;
}

Ok(())
}
}

async fn transfer_sui(
client: &SuiClient,
keypair: &SuiKeyPair,
gas_payment: ObjectID,
gas_budget: u64,
recipient: SuiAddress,
amount: u64,
) -> SuiTransactionResponse {
let sender = SuiAddress::from(&keypair.public());
let tx = client
.transaction_builder()
.transfer_sui(sender, gas_payment, gas_budget, recipient, Some(amount))
.await
.expect("Failed to construct transfer coin transaction");
sign_and_execute(client, keypair, tx).await
}
101 changes: 76 additions & 25 deletions crates/sui-rpc-loadgen/src/payload/rpc_command_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use crate::payload::{
Command, CommandData, DryRun, GetCheckpoints, Payload, ProcessPayload, Processor, SignerInfo,
};

pub(crate) const DEFAULT_GAS_BUDGET: u64 = 100_000;
pub(crate) const DEFAULT_GAS_BUDGET: u64 = 10_000;
pub(crate) const DEFAULT_LARGE_GAS_BUDGET: u64 = 100_000_000;

#[derive(Clone)]
pub struct RpcCommandProcessor {
Expand All @@ -38,7 +39,7 @@ impl RpcCommandProcessor {
let clients = join_all(urls.iter().map(|url| async {
SuiClientBuilder::default()
.max_concurrent_requests(usize::MAX)
.request_timeout(Duration::from_secs(10))
.request_timeout(Duration::from_secs(60))
.build(url.clone())
.await
.unwrap()
Expand Down Expand Up @@ -110,14 +111,16 @@ impl Processor for RpcCommandProcessor {
prepare_new_signer_and_coins(
clients.first().unwrap(),
config.signer_info.as_ref().unwrap(),
config.num_threads,
config.num_threads * config.num_chunks_per_thread,
config.max_repeat as u64 + 1,
)
.await,
)
} else {
None
};

let num_chunks = config.num_chunks_per_thread;
Ok(command_payloads
.into_iter()
.enumerate()
Expand All @@ -127,7 +130,7 @@ impl Processor for RpcCommandProcessor {
.as_ref()
.map(|(coins, encoded_keypair)| SignerInfo {
encoded_keypair: encoded_keypair.clone(),
gas_payment: Some(coins[i]),
gas_payment: Some(coins[num_chunks * i..(i + 1) * num_chunks].to_vec()),
gas_budget: None,
}),
})
Expand Down Expand Up @@ -186,33 +189,67 @@ async fn prepare_new_signer_and_coins(
client: &SuiClient,
signer_info: &SignerInfo,
num_coins: usize,
num_transactions_per_coin: u64,
) -> (Vec<ObjectID>, String) {
// TODO: This is due to the limit of number of objects that can be created in a single transaction
// we can bypass the limit to use multiple split coin transactions
if num_coins > 2000 {
panic!("num_threads * num_chunks_per_thread cannot exceed 2000, current val {num_coins}")
}
let num_coins = num_coins as u64;
let primary_keypair = SuiKeyPair::decode_base64(&signer_info.encoded_keypair)
.expect("Decoding keypair should not fail");
let sender = SuiAddress::from(&primary_keypair.public());
let coins = get_sui_coin_ids(client, sender).await;
// one coin for splitting, the other coins for gas payment
let coin_to_split: ObjectID = coins[0];
let coin_for_split_gas: ObjectID = coins[1];
let (coin, balance) = get_coin_with_max_balance(client, sender).await;

// We don't want to split coins in our primary address because we want to avoid having
// a million coin objects in our address. We can also fetch directly from the faucet, but in
// some environment that might not be possible when faucet resource is scarce
let (burner_address, burner_keypair): (_, AccountKeyPair) = get_key_pair();
let burner_keypair = SuiKeyPair::Ed25519(burner_keypair);
transfer_coin(client, &primary_keypair, coin_to_split, burner_address).await;
transfer_coin(client, &primary_keypair, coin_for_split_gas, burner_address).await;
// Some coins has an enormous amount of gas, we want to limit the max pay amount so that
// we don't squander the entire balance
// TODO(chris): consider reference gas price
let max_pay_amount = num_transactions_per_coin * DEFAULT_GAS_BUDGET * num_coins;
let amount_for_primary_coin =
// we need to subtract the following
// 1. gas fee(i.e., `DEFAULT_GAS_BUDGET`) for pay_sui from the primary address to the burner address
// 2. gas fee(i.e., DEFAULT_LARGE_GAS_BUDGET) for splitting the primary coin into `num_coins`
max_pay_amount.min(balance - DEFAULT_LARGE_GAS_BUDGET - DEFAULT_GAS_BUDGET);
pay_sui(
client,
&primary_keypair,
vec![coin],
DEFAULT_GAS_BUDGET,
vec![burner_address; 2],
vec![amount_for_primary_coin, DEFAULT_LARGE_GAS_BUDGET],
)
.await;

let coin_to_split =
get_coin_with_balance(client, burner_address, amount_for_primary_coin).await;
let mut results: Vec<ObjectID> =
split_coins(client, &burner_keypair, coin_to_split, num_coins as u64).await;
split_coins(client, &burner_keypair, coin_to_split, num_coins).await;
results.push(coin_to_split);
debug!("Split {coin_to_split} into {results:?} for gas payment");
assert_eq!(results.len(), num_coins);
assert_eq!(results.len(), num_coins as usize);
(results, burner_keypair.encode_base64())
}

async fn get_coin_with_max_balance(client: &SuiClient, address: SuiAddress) -> (ObjectID, u64) {
let coins = get_sui_coin_ids(client, address).await;
assert!(!coins.is_empty());
coins.into_iter().max_by(|a, b| a.1.cmp(&b.1)).unwrap()
}

async fn get_coin_with_balance(client: &SuiClient, address: SuiAddress, balance: u64) -> ObjectID {
let coins = get_sui_coin_ids(client, address).await;
assert!(!coins.is_empty());
coins.into_iter().find(|a| a.1 == balance).unwrap().0
}

// TODO: move this to the Rust SDK
async fn get_sui_coin_ids(client: &SuiClient, address: SuiAddress) -> Vec<ObjectID> {
async fn get_sui_coin_ids(client: &SuiClient, address: SuiAddress) -> Vec<(ObjectID, u64)> {
match client
.coin_read_api()
.get_coins(address, None, None, None)
Expand All @@ -221,7 +258,7 @@ async fn get_sui_coin_ids(client: &SuiClient, address: SuiAddress) -> Vec<Object
Ok(page) => page
.data
.into_iter()
.map(|c| c.coin_object_id)
.map(|c| (c.coin_object_id, c.balance))
.collect::<Vec<_>>(),
Err(e) => {
panic!("get_sui_coin_ids error for address {address} {e}")
Expand All @@ -230,19 +267,21 @@ async fn get_sui_coin_ids(client: &SuiClient, address: SuiAddress) -> Vec<Object
// TODO: implement iteration over next page
}

async fn transfer_coin(
async fn pay_sui(
client: &SuiClient,
keypair: &SuiKeyPair,
coin_id: ObjectID,
recipient: SuiAddress,
input_coins: Vec<ObjectID>,
gas_budget: u64,
recipients: Vec<SuiAddress>,
amounts: Vec<u64>,
) -> SuiTransactionResponse {
let sender = SuiAddress::from(&keypair.public());
let txn = client
let tx = client
.transaction_builder()
.transfer_object(sender, coin_id, None, DEFAULT_GAS_BUDGET, recipient)
.pay(sender, input_coins, recipients, amounts, None, gas_budget)
.await
.expect("Failed to construct transfer coin transaction");
sign_and_execute(client, keypair, txn).await
.expect("Failed to construct pay sui transaction");
sign_and_execute(client, keypair, tx).await
}

async fn split_coins(
Expand All @@ -254,7 +293,13 @@ async fn split_coins(
let sender = SuiAddress::from(&keypair.public());
let split_coin_tx = client
.transaction_builder()
.split_coin_equal(sender, coin_to_split, num_coins, None, DEFAULT_GAS_BUDGET)
.split_coin_equal(
sender,
coin_to_split,
num_coins,
None,
DEFAULT_LARGE_GAS_BUDGET,
)
.await
.expect("Failed to construct split coin transaction");
sign_and_execute(client, keypair, split_coin_tx)
Expand All @@ -267,15 +312,15 @@ async fn split_coins(
.collect::<Vec<_>>()
}

async fn sign_and_execute(
pub(crate) async fn sign_and_execute(
client: &SuiClient,
keypair: &SuiKeyPair,
txn_data: TransactionData,
) -> SuiTransactionResponse {
let signature =
Signature::new_secure(&IntentMessage::new(Intent::default(), &txn_data), keypair);

let transaction_response = client
let transaction_response = match client
.quorum_driver()
.execute_transaction(
Transaction::from_data(txn_data, Intent::default(), vec![signature])
Expand All @@ -285,7 +330,13 @@ async fn sign_and_execute(
Some(ExecuteTransactionRequestType::WaitForLocalExecution),
)
.await
.unwrap_or_else(|_| panic!("Execute Transaction Failed"));
{
Ok(response) => response,
Err(e) => {
panic!("sign_and_execute error {e}")
}
};

match &transaction_response.effects {
Some(effects) => {
if let SuiExecutionStatus::Failure { error } = effects.status() {
Expand Down

0 comments on commit 46b3844

Please sign in to comment.