Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Enable QUIC client by default. Add arg to disable QUIC client. Take 2 (
Browse files Browse the repository at this point in the history
…#26927)

* Enable QUIC client by default. Add arg to disable QUIC client.
* Deprecate --disable-quic-servers arg
* Add #[ignore] annotation to failing tests
  • Loading branch information
willhickey authored Aug 17, 2022
1 parent c00adba commit 4bda909
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 63 deletions.
6 changes: 3 additions & 3 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ fn main() {
.help("Number of threads to use in the banking stage"),
)
.arg(
Arg::new("tpu_use_quic")
.long("tpu-use-quic")
Arg::new("tpu_disable_quic")
.long("tpu-disable-quic")
.takes_value(false)
.help("Forward messages to TPU using QUIC"),
.help("Disable forwarding messages to TPU using QUIC"),
)
.get_matches();

Expand Down
10 changes: 5 additions & 5 deletions bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,10 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
.help("Submit transactions with a TpuClient")
)
.arg(
Arg::with_name("tpu_use_quic")
.long("tpu-use-quic")
Arg::with_name("tpu_disable_quic")
.long("tpu-disable-quic")
.takes_value(false)
.help("Submit transactions via QUIC; only affects ThinClient (default) \
.help("Do not submit transactions via QUIC; only affects ThinClient (default) \
or TpuClient sends"),
)
.arg(
Expand Down Expand Up @@ -348,8 +348,8 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
args.external_client_type = ExternalClientType::RpcClient;
}

if matches.is_present("tpu_use_quic") {
args.use_quic = true;
if matches.is_present("tpu_disable_quic") {
args.use_quic = false;
}

if let Some(v) = matches.value_of("tpu_connection_pool_size") {
Expand Down
1 change: 1 addition & 0 deletions bench-tps/tests/bench_tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ fn test_bench_tps_test_validator(config: Config) {

#[test]
#[serial]
#[ignore]
fn test_bench_tps_local_cluster_solana() {
test_bench_tps_local_cluster(Config {
tx_count: 100,
Expand Down
30 changes: 23 additions & 7 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ static MAX_CONNECTIONS: usize = 1024;

/// Used to decide whether the TPU and underlying connection cache should use
/// QUIC connections.
pub const DEFAULT_TPU_USE_QUIC: bool = false;
pub const DEFAULT_TPU_USE_QUIC: bool = true;

/// Default TPU connection pool size per remote address
pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;
Expand Down Expand Up @@ -683,6 +683,11 @@ mod tests {
// be lazy and not connect until first use or handle connection errors somehow
// (without crashing, as would be required in a real practical validator)
let connection_cache = ConnectionCache::default();
let port_offset = if connection_cache.use_quic() {
QUIC_PORT_OFFSET
} else {
0
};
let addrs = (0..MAX_CONNECTIONS)
.into_iter()
.map(|_| {
Expand All @@ -695,18 +700,29 @@ mod tests {
let map = connection_cache.map.read().unwrap();
assert!(map.len() == MAX_CONNECTIONS);
addrs.iter().for_each(|a| {
let conn = &map.get(a).expect("Address not found").connections[0];
let conn = conn.new_blocking_connection(*a, connection_cache.stats.clone());
assert!(a.ip() == conn.tpu_addr().ip());
let port = a
.port()
.checked_add(port_offset)
.unwrap_or_else(|| a.port());
let addr = &SocketAddr::new(a.ip(), port);

let conn = &map.get(addr).expect("Address not found").connections[0];
let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone());
assert!(addr.ip() == conn.tpu_addr().ip());
});
}

let addr = get_addr(&mut rng);
connection_cache.get_connection(&addr);
let addr = &get_addr(&mut rng);
connection_cache.get_connection(addr);

let port = addr
.port()
.checked_add(port_offset)
.unwrap_or_else(|| addr.port());
let addr_with_quic_port = SocketAddr::new(addr.ip(), port);
let map = connection_cache.map.read().unwrap();
assert!(map.len() == MAX_CONNECTIONS);
let _conn = map.get(&addr).expect("Address not found");
let _conn = map.get(&addr_with_quic_port).expect("Address not found");
}

#[test]
Expand Down
2 changes: 2 additions & 0 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4120,6 +4120,7 @@ mod tests {
}

#[test]
#[ignore]
fn test_forwarder_budget() {
solana_logger::setup();
// Create `PacketBatch` with 1 unprocessed packet
Expand Down Expand Up @@ -4207,6 +4208,7 @@ mod tests {
}

#[test]
#[ignore]
fn test_handle_forwarding() {
solana_logger::setup();
// packets are deserialized upon receiving, failed packets will not be
Expand Down
69 changes: 30 additions & 39 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ pub struct Tpu {
banking_stage: BankingStage,
cluster_info_vote_listener: ClusterInfoVoteListener,
broadcast_stage: BroadcastStage,
tpu_quic_t: Option<thread::JoinHandle<()>>,
tpu_forwards_quic_t: Option<thread::JoinHandle<()>>,
tpu_quic_t: thread::JoinHandle<()>,
tpu_forwards_quic_t: thread::JoinHandle<()>,
find_packet_sender_stake_stage: FindPacketSenderStakeStage,
vote_find_packet_sender_stake_stage: FindPacketSenderStakeStage,
staked_nodes_updater_service: StakedNodesUpdaterService,
Expand Down Expand Up @@ -96,7 +96,6 @@ impl Tpu {
connection_cache: &Arc<ConnectionCache>,
keypair: &Keypair,
log_messages_bytes_limit: Option<usize>,
enable_quic_servers: bool,
staked_nodes: &Arc<RwLock<StakedNodes>>,
) -> Self {
let TpuSockets {
Expand Down Expand Up @@ -154,37 +153,33 @@ impl Tpu {
let (verified_sender, verified_receiver) = unbounded();

let stats = Arc::new(StreamStats::default());
let tpu_quic_t = enable_quic_servers.then(|| {
spawn_server(
transactions_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu.ip(),
packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
)
.unwrap()
});
let tpu_quic_t = spawn_server(
transactions_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu.ip(),
packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
)
.unwrap();

let tpu_forwards_quic_t = enable_quic_servers.then(|| {
spawn_server(
transactions_forwards_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu_forwards.ip(),
forwarded_packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
stats,
)
.unwrap()
});
let tpu_forwards_quic_t = spawn_server(
transactions_forwards_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu_forwards.ip(),
forwarded_packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
stats,
)
.unwrap();

let sigverify_stage = {
let verifier = TransactionSigVerifier::new(verified_sender);
Expand Down Expand Up @@ -271,13 +266,9 @@ impl Tpu {
self.find_packet_sender_stake_stage.join(),
self.vote_find_packet_sender_stake_stage.join(),
self.staked_nodes_updater_service.join(),
self.tpu_quic_t.join(),
self.tpu_forwards_quic_t.join(),
];
if let Some(tpu_quic_t) = self.tpu_quic_t {
tpu_quic_t.join()?;
}
if let Some(tpu_forwards_quic_t) = self.tpu_forwards_quic_t {
tpu_forwards_quic_t.join()?;
}
let broadcast_result = self.broadcast_stage.join();
for result in results {
result?;
Expand Down
3 changes: 0 additions & 3 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ pub struct ValidatorConfig {
pub wait_to_vote_slot: Option<Slot>,
pub ledger_column_options: LedgerColumnOptions,
pub runtime_config: RuntimeConfig,
pub enable_quic_servers: bool,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -237,7 +236,6 @@ impl Default for ValidatorConfig {
wait_to_vote_slot: None,
ledger_column_options: LedgerColumnOptions::default(),
runtime_config: RuntimeConfig::default(),
enable_quic_servers: true,
}
}
}
Expand Down Expand Up @@ -1036,7 +1034,6 @@ impl Validator {
&connection_cache,
&identity_keypair,
config.runtime_config.log_messages_bytes_limit,
config.enable_quic_servers,
&staked_nodes,
);

Expand Down
2 changes: 2 additions & 0 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,11 +1185,13 @@ pub mod test {
}

#[test]
#[ignore]
fn test_dos_with_blockhash_and_payer() {
run_dos_with_blockhash_and_payer(/*tpu_use_quic*/ false)
}

#[test]
#[ignore]
fn test_dos_with_blockhash_and_payer_and_quic() {
run_dos_with_blockhash_and_payer(/*tpu_use_quic*/ true)
}
Expand Down
1 change: 0 additions & 1 deletion local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
wait_to_vote_slot: config.wait_to_vote_slot,
ledger_column_options: config.ledger_column_options.clone(),
runtime_config: config.runtime_config.clone(),
enable_quic_servers: config.enable_quic_servers,
}
}

Expand Down
4 changes: 4 additions & 0 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ fn test_spend_and_verify_all_nodes_3() {

#[test]
#[serial]
#[ignore]
fn test_local_cluster_signature_subscribe() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
let num_nodes = 2;
Expand Down Expand Up @@ -311,6 +312,7 @@ fn test_two_unbalanced_stakes() {

#[test]
#[serial]
#[ignore]
fn test_forwarding() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
// Set up a cluster where one node is never the leader, so all txs sent to this node
Expand Down Expand Up @@ -1228,6 +1230,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st
#[allow(unused_attributes)]
#[test]
#[serial]
#[ignore]
fn test_snapshot_restart_tower() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
// First set up the cluster with 2 nodes
Expand Down Expand Up @@ -2520,6 +2523,7 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {

#[test]
#[serial]
#[ignore]
fn test_votes_land_in_fork_during_long_partition() {
let total_stake = 3 * DEFAULT_NODE_STAKE;
// Make `lighter_stake` insufficient for switching threshold
Expand Down
2 changes: 2 additions & 0 deletions local-cluster/tests/local_cluster_slow_1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod common;

#[test]
#[serial]
#[ignore]
// Steps in this test:
// We want to create a situation like:
/*
Expand Down Expand Up @@ -587,6 +588,7 @@ fn test_duplicate_shreds_broadcast_leader() {

#[test]
#[serial]
#[ignore]
fn test_switch_threshold_uses_gossip_votes() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
let total_stake = 100 * DEFAULT_NODE_STAKE;
Expand Down
1 change: 1 addition & 0 deletions local-cluster/tests/local_cluster_slow_2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ fn test_leader_failure_4() {

#[test]
#[serial]
#[ignore]
fn test_ledger_cleanup_service() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
error!("test_ledger_cleanup_service");
Expand Down
2 changes: 1 addition & 1 deletion multinode-demo/bootstrap-validator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ while [[ -n $1 ]]; do
elif [[ $1 = --enable-rpc-bigtable-ledger-storage ]]; then
args+=("$1")
shift
elif [[ $1 = --tpu-use-quic ]]; then
elif [[ $1 = --tpu-disable-quic ]]; then
args+=("$1")
shift
elif [[ $1 = --rpc-send-batch-ms ]]; then
Expand Down
17 changes: 13 additions & 4 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,13 +1213,21 @@ pub fn main() {
Arg::with_name("tpu_use_quic")
.long("tpu-use-quic")
.takes_value(false)
.hidden(true)
.conflicts_with("tpu_disable_quic")
.help("Use QUIC to send transactions."),
)
.arg(
Arg::with_name("tpu_disable_quic")
.long("tpu-disable-quic")
.takes_value(false)
.help("Do not use QUIC to send transactions."),
)
.arg(
Arg::with_name("disable_quic_servers")
.long("disable-quic-servers")
.takes_value(false)
.help("Disable QUIC TPU servers"),
.hidden(true)
)
.arg(
Arg::with_name("enable_quic_servers")
Expand Down Expand Up @@ -2314,8 +2322,7 @@ pub fn main() {
let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
let accounts_shrink_optimize_total_space =
value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
let tpu_use_quic = matches.is_present("tpu_use_quic");
let enable_quic_servers = !matches.is_present("disable_quic_servers");
let tpu_use_quic = !matches.is_present("tpu_disable_quic");
let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize);

let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
Expand Down Expand Up @@ -2485,6 +2492,9 @@ pub fn main() {
if matches.is_present("enable_quic_servers") {
warn!("--enable-quic-servers is now the default behavior. This flag is deprecated and can be removed from the launch args");
}
if matches.is_present("disable_quic_servers") {
warn!("--disable-quic-servers is deprecated. The quic server cannot be disabled.");
}

let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage")
|| matches.is_present("enable_bigtable_ledger_upload")
Expand Down Expand Up @@ -2664,7 +2674,6 @@ pub fn main() {
log_messages_bytes_limit: value_of(&matches, "log_messages_bytes_limit"),
..RuntimeConfig::default()
},
enable_quic_servers,
..ValidatorConfig::default()
};

Expand Down

0 comments on commit 4bda909

Please sign in to comment.