Skip to content

Commit

Permalink
[fastx dist sys] Add DBMap / DBBatch based storage; remove broken sha…
Browse files Browse the repository at this point in the history
…ring; move to more async. (#86)

* Add DB store 
* Make handle_order async + fix tests
* Make handle_confirmation_order async + fix tests
* Make objects derive serialize / deserialize
* Pass a database path in the config
* Removed all sharding logic
* Added a .parent() method to authority
* MassClient uses multiple TCP connections
* Added persistence test
* Fix packet counters using atomics
* Remove locks when not needed, and add criticial region lock
* use number of CPUs in bench
  • Loading branch information
gdanezis authored Dec 30, 2021
1 parent 28ac533 commit 1a9bdf4
Show file tree
Hide file tree
Showing 18 changed files with 1,001 additions and 880 deletions.
1 change: 1 addition & 0 deletions fastpay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ serde_json = "1.0.57"
structopt = "0.3"
tempfile = "3.2.0"
tokio = { version = "0.2.22", features = ["full"] }
num_cpus = "1.13.1"

fastpay_core = { path = "../fastpay_core" }
fastx-types = { path = "../fastx_types" }
Expand Down
141 changes: 63 additions & 78 deletions fastpay/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use fastpay_core::authority::*;
use fastx_types::{base_types::*, committee::*, messages::*, object::Object, serialize::*};
use futures::stream::StreamExt;
use log::*;
use std::{
collections::HashMap,
time::{Duration, Instant},
};
use std::time::{Duration, Instant};
use structopt::StructOpt;
use tokio::runtime::Runtime;
use tokio::{runtime::Builder, time};

use std::env;
use std::fs;
use std::thread;

#[derive(Debug, Clone, StructOpt)]
Expand All @@ -25,7 +25,7 @@ use std::thread;
)]
struct ClientServerBenchmark {
/// Choose a network protocol between Udp and Tcp
#[structopt(long, default_value = "udp")]
#[structopt(long, default_value = "tcp")]
protocol: transport::NetworkProtocol,
/// Hostname
#[structopt(long, default_value = "127.0.0.1")]
Expand All @@ -36,9 +36,6 @@ struct ClientServerBenchmark {
/// Size of the FastPay committee
#[structopt(long, default_value = "10")]
committee_size: usize,
/// Number of shards per FastPay authority
#[structopt(long, default_value = "15")]
num_shards: u32,
/// Maximum number of requests in flight (0 for blocking client)
#[structopt(long, default_value = "1000")]
max_in_flight: usize,
Expand All @@ -59,30 +56,27 @@ struct ClientServerBenchmark {
fn main() {
env_logger::from_env(env_logger::Env::default().default_filter_or("info")).init();
let benchmark = ClientServerBenchmark::from_args();

let (states, orders) = benchmark.make_structures();

// Start the servers on the thread pool
for state in states {
// Make special single-core runtime for each server
let b = benchmark.clone();
thread::spawn(move || {
let mut runtime = Builder::new()
.enable_all()
.basic_scheduler()
.thread_stack_size(15 * 1024 * 1024)
.build()
.unwrap();

runtime.block_on(async move {
let server = b.spawn_server(state).await;
if let Err(err) = server.join().await {
error!("Server ended with an error: {}", err);
}
});
let (state, orders) = benchmark.make_structures();

// Make multi-threaded runtime for the authority
let b = benchmark.clone();
thread::spawn(move || {
let mut runtime = Builder::new()
.enable_all()
.threaded_scheduler()
.thread_stack_size(15 * 1024 * 1024)
.build()
.unwrap();

runtime.block_on(async move {
let server = b.spawn_server(state).await;
if let Err(err) = server.join().await {
error!("Server ended with an error: {}", err);
}
});
}
});

// Make a single-core runtime for the client.
let mut runtime = Builder::new()
.enable_all()
.basic_scheduler()
Expand All @@ -93,7 +87,7 @@ fn main() {
}

impl ClientServerBenchmark {
fn make_structures(&self) -> (Vec<AuthorityState>, Vec<(u32, Bytes)>) {
fn make_structures(&self) -> (AuthorityState, Vec<Bytes>) {
info!("Preparing accounts.");
let mut keys = Vec::new();
for _ in 0..self.committee_size {
Expand All @@ -104,37 +98,35 @@ impl ClientServerBenchmark {
total_votes: self.committee_size,
};

// Pick an authority and create one state per shard.
// Pick an authority and create state.
let (public_auth0, secret_auth0) = keys.pop().unwrap();
let mut states = Vec::new();
for i in 0..self.num_shards {
let state = AuthorityState::new_shard(
committee.clone(),
public_auth0,
secret_auth0.copy(),
i as u32,
self.num_shards,
);
states.push(state);
}

// Create a random directory to store the DB
let dir = env::temp_dir();
let path = dir.join(format!("DB_{:?}", ObjectID::random()));
fs::create_dir(&path).unwrap();

let state = AuthorityState::new(committee.clone(), public_auth0, secret_auth0.copy(), path);

// Seed user accounts.
let mut rt = Runtime::new().unwrap();
let mut account_objects = Vec::new();
for _ in 0..self.num_accounts {
let keypair = get_key_pair();
let object_id: ObjectID = ObjectID::random();
let i = AuthorityState::get_shard(self.num_shards, &object_id) as usize;
assert!(states[i].in_shard(&object_id));
let mut client = Object::with_id_for_testing(object_id);
client.transfer(keypair.0);
states[i].init_order_lock(client.to_object_reference());
states[i].insert_object(client);
account_objects.push((keypair.0, object_id, keypair.1));
}
rt.block_on(async {
for _ in 0..self.num_accounts {
let keypair = get_key_pair();
let object_id: ObjectID = ObjectID::random();

let client = Object::with_id_owner_for_testing(object_id, keypair.0);
assert!(client.next_sequence_number == SequenceNumber::from(0));
state.init_order_lock(client.to_object_reference()).await;
state.insert_object(client).await;
account_objects.push((keypair.0, object_id, keypair.1));
}
});

info!("Preparing transactions.");
// Make one transaction per account (transfer order + confirmation).
let mut orders: Vec<(u32, Bytes)> = Vec::new();
let mut orders: Vec<Bytes> = Vec::new();
let mut next_recipient = get_key_pair().0;
for (pubx, object_id, secx) in account_objects.iter() {
let transfer = Transfer {
Expand All @@ -145,12 +137,11 @@ impl ClientServerBenchmark {
user_data: UserData::default(),
};
next_recipient = *pubx;
let order = Order::new_transfer(transfer.clone(), secx);
let shard = AuthorityState::get_shard(self.num_shards, order.object_id());
let order = Order::new_transfer(transfer, secx);

// Serialize order
let bufx = serialize_order(&order);
assert!(!bufx.is_empty());
let serialized_order = serialize_order(&order);
assert!(!serialized_order.is_empty());

// Make certificate
let mut certificate = CertifiedOrder {
Expand All @@ -163,14 +154,14 @@ impl ClientServerBenchmark {
certificate.signatures.push((*pubx, sig));
}

let bufx2 = serialize_cert(&certificate);
assert!(!bufx2.is_empty());
let serialized_certificate = serialize_cert(&certificate);
assert!(!serialized_certificate.is_empty());

orders.push((shard, bufx2.into()));
orders.push((shard, bufx.into()));
orders.push(serialized_order.into());
orders.push(serialized_certificate.into());
}

(states, orders)
(state, orders)
}

async fn spawn_server(&self, state: AuthorityState) -> transport::SpawnedServer {
Expand All @@ -184,14 +175,15 @@ impl ClientServerBenchmark {
server.spawn().await.unwrap()
}

async fn launch_client(&self, mut orders: Vec<(u32, Bytes)>) {
async fn launch_client(&self, mut orders: Vec<Bytes>) {
time::delay_for(Duration::from_millis(1000)).await;

let items_number = orders.len() / 2;
let time_start = Instant::now();

let max_in_flight = (self.max_in_flight / self.num_shards as usize) as usize;
info!("Set max_in_flight per shard to {}", max_in_flight);
let connections: usize = num_cpus::get();
let max_in_flight = self.max_in_flight / connections as usize;
info!("Set max_in_flight to {}", max_in_flight);

info!("Sending requests.");
if self.max_in_flight > 0 {
Expand All @@ -204,22 +196,15 @@ impl ClientServerBenchmark {
Duration::from_micros(self.recv_timeout_us),
max_in_flight as u64,
);
let mut sharded_requests = HashMap::new();
for (shard, buf) in orders.iter().rev() {
sharded_requests
.entry(*shard)
.or_insert_with(Vec::new)
.push(buf.clone());
}
let responses = mass_client.run(sharded_requests).concat().await;

let responses = mass_client.run(orders, connections).concat().await;
info!("Received {} responses.", responses.len(),);
} else {
// Use actual client core
let client = network::Client::new(
self.protocol,
self.host.clone(),
self.port,
self.num_shards,
self.buffer_size,
Duration::from_micros(self.send_timeout_us),
Duration::from_micros(self.recv_timeout_us),
Expand All @@ -229,8 +214,8 @@ impl ClientServerBenchmark {
if orders.len() % 1000 == 0 {
info!("Process message {}...", orders.len());
}
let (shard, order) = orders.pop().unwrap();
let status = client.send_recv_bytes(shard, order.to_vec()).await;
let order = orders.pop().unwrap();
let status = client.send_recv_bytes(order.to_vec()).await;
match status {
Ok(info) => {
debug!("Query response: {:?}", info);
Expand Down
24 changes: 9 additions & 15 deletions fastpay/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#![deny(warnings)]

use fastpay::{config::*, network, transport};
use fastpay_core::{authority::*, client::*};
use fastpay_core::client::*;
use fastx_types::{base_types::*, committee::Committee, messages::*, serialize::*};

use bytes::Bytes;
Expand All @@ -30,7 +30,6 @@ fn make_authority_clients(
config.network_protocol,
config.host,
config.base_port,
config.num_shards,
buffer_size,
send_timeout,
recv_timeout,
Expand All @@ -46,7 +45,7 @@ fn make_authority_mass_clients(
send_timeout: std::time::Duration,
recv_timeout: std::time::Duration,
max_in_flight: u64,
) -> Vec<(u32, network::MassClient)> {
) -> Vec<network::MassClient> {
let mut authority_clients = Vec::new();
for config in &committee_config.authorities {
let client = network::MassClient::new(
Expand All @@ -56,9 +55,9 @@ fn make_authority_mass_clients(
buffer_size,
send_timeout,
recv_timeout,
max_in_flight / config.num_shards as u64, // Distribute window to diff shards
max_in_flight,
);
authority_clients.push((config.num_shards, client));
authority_clients.push(client);
}
authority_clients
}
Expand Down Expand Up @@ -219,17 +218,12 @@ async fn mass_broadcast_orders(
max_in_flight,
);
let mut streams = Vec::new();
for (num_shards, client) in authority_clients {
// Re-index orders by shard for this particular authority client.
let mut sharded_requests = HashMap::new();
for (object_id, buf) in &orders {
let shard = AuthorityState::get_shard(num_shards, object_id);
sharded_requests
.entry(shard)
.or_insert_with(Vec::new)
.push(buf.clone());
for client in authority_clients {
let mut requests = Vec::new();
for (_object_id, buf) in &orders {
requests.push(buf.clone());
}
streams.push(client.run(sharded_requests));
streams.push(client.run(requests, 1));
}
let responses = futures::stream::select_all(streams).concat().await;
let time_elapsed = time_start.elapsed();
Expand Down
2 changes: 1 addition & 1 deletion fastpay/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct AuthorityConfig {
pub address: FastPayAddress,
pub host: String,
pub base_port: u32,
pub num_shards: u32,
pub database_path: String,
}

impl AuthorityConfig {
Expand Down
Loading

1 comment on commit 1a9bdf4

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bench results

�[0m�[1m�[33mwarning�[0m�[0m�[1m: unused variable: num_immutable_objects�[0m
�[0m �[0m�[0m�[1m�[38;5;12m--> �[0m�[0mfastx_programmability/adapter/src/adapter.rs:377:13�[0m
�[0m �[0m�[0m�[1m�[38;5;12m|�[0m
�[0m�[1m�[38;5;12m377�[0m�[0m �[0m�[0m�[1m�[38;5;12m| �[0m�[0m let mut num_immutable_objects = 0;�[0m
�[0m �[0m�[0m�[1m�[38;5;12m| �[0m�[0m �[0m�[0m�[1m�[33m^^^^^^^^^^^^^^^^^^^^^�[0m�[0m �[0m�[0m�[1m�[33mhelp: if this is intentional, prefix it with an underscore: _num_immutable_objects�[0m
�[0m �[0m�[0m�[1m�[38;5;12m= �[0m�[0m�[1mnote�[0m�[0m: #[warn(unused_variables)] on by default�[0m

�[0m�[0m�[1m�[33mwarning�[0m�[1m:�[0m fastx-adapter (lib) generated 1 warning
�[0m�[0m�[1m�[32m Finished�[0m release [optimized + debuginfo] target(s) in 1.82s
�[0m�[0m�[1m�[32m Running�[0m target/release/bench
[2021-12-30T20:38:01Z INFO bench] Preparing accounts.
[2021-12-30T20:38:03Z INFO bench] Preparing transactions.
[2021-12-30T20:38:11Z INFO fastpay::network] Listening to Tcp traffic on 127.0.0.1:9555
[2021-12-30T20:38:12Z INFO bench] Set max_in_flight to 500
[2021-12-30T20:38:12Z INFO bench] Sending requests.
[2021-12-30T20:38:12Z INFO fastpay::network] Sending Tcp requests to 127.0.0.1:9555
[2021-12-30T20:38:13Z INFO fastpay::network] 127.0.0.1:9555 has processed 5000 packets
[2021-12-30T20:38:14Z INFO fastpay::network] In flight 500 Remaining 35000
[2021-12-30T20:38:14Z INFO fastpay::network] 127.0.0.1:9555 has processed 10000 packets
[2021-12-30T20:38:15Z INFO fastpay::network] 127.0.0.1:9555 has processed 15000 packets
[2021-12-30T20:38:15Z INFO fastpay::network] In flight 500 Remaining 30000
[2021-12-30T20:38:16Z INFO fastpay::network] 127.0.0.1:9555 has processed 20000 packets
[2021-12-30T20:38:16Z INFO fastpay::network] In flight 500 Remaining 30000
[2021-12-30T20:38:17Z INFO fastpay::network] 127.0.0.1:9555 has processed 25000 packets
[2021-12-30T20:38:17Z INFO fastpay::network] In flight 500 Remaining 25000
[2021-12-30T20:38:17Z INFO fastpay::network] 127.0.0.1:9555 has processed 30000 packets
[2021-12-30T20:38:18Z INFO fastpay::network] In flight 500 Remaining 25000
[2021-12-30T20:38:18Z INFO fastpay::network] 127.0.0.1:9555 has processed 35000 packets
[2021-12-30T20:38:19Z INFO fastpay::network] In flight 500 Remaining 20000
[2021-12-30T20:38:19Z INFO fastpay::network] 127.0.0.1:9555 has processed 40000 packets
[2021-12-30T20:38:20Z INFO fastpay::network] In flight 500 Remaining 20000
[2021-12-30T20:38:20Z INFO fastpay::network] 127.0.0.1:9555 has processed 45000 packets
[2021-12-30T20:38:20Z INFO fastpay::network] In flight 500 Remaining 15000
[2021-12-30T20:38:21Z INFO fastpay::network] 127.0.0.1:9555 has processed 50000 packets
[2021-12-30T20:38:21Z INFO fastpay::network] In flight 500 Remaining 15000
[2021-12-30T20:38:21Z INFO fastpay::network] 127.0.0.1:9555 has processed 55000 packets
[2021-12-30T20:38:21Z INFO fastpay::network] In flight 500 Remaining 10000
[2021-12-30T20:38:22Z INFO fastpay::network] 127.0.0.1:9555 has processed 60000 packets
[2021-12-30T20:38:22Z INFO fastpay::network] In flight 500 Remaining 10000
[2021-12-30T20:38:22Z INFO fastpay::network] 127.0.0.1:9555 has processed 65000 packets
[2021-12-30T20:38:23Z INFO fastpay::network] In flight 500 Remaining 5000
[2021-12-30T20:38:23Z INFO fastpay::network] 127.0.0.1:9555 has processed 70000 packets
[2021-12-30T20:38:24Z INFO fastpay::network] 127.0.0.1:9555 has processed 75000 packets
[2021-12-30T20:38:24Z INFO fastpay::network] Done sending Tcp requests to 127.0.0.1:9555
[2021-12-30T20:38:25Z INFO fastpay::network] 127.0.0.1:9555 has processed 80000 packets
[2021-12-30T20:38:25Z INFO fastpay::network] Done sending Tcp requests to 127.0.0.1:9555
[2021-12-30T20:38:25Z INFO bench] Received 80000 responses.
[2021-12-30T20:38:25Z WARN bench] Total time: 12543927us, items: 40000, tx/sec: 3188.794067439965

Please sign in to comment.