Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Added option to turn on UDP for TPU transaction and make UDP based TPU off by default #27462

Merged
merged 13 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Added --tpu-disable-udp to disable UDP based transactions
  • Loading branch information
lijunwangs committed Sep 6, 2022
commit 663faf5d070ea26356d690afd958ad26effb1be9
74 changes: 44 additions & 30 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use {
solana_streamer::streamer::{
self, PacketBatchReceiver, PacketBatchSender, StreamerReceiveStats,
},
solana_tpu_client::connection_cache::DEFAULT_TPU_DISABLE_UDP,
std::{
net::UdpSocket,
sync::{
Expand Down Expand Up @@ -57,6 +58,7 @@ impl FetchStage {
poh_recorder,
coalesce_ms,
None,
DEFAULT_TPU_DISABLE_UDP,
),
receiver,
vote_receiver,
Expand All @@ -76,6 +78,7 @@ impl FetchStage {
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
tpu_disable_udp: bool,
) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
Expand All @@ -92,6 +95,7 @@ impl FetchStage {
poh_recorder,
coalesce_ms,
in_vote_only_mode,
tpu_disable_udp,
)
}

Expand Down Expand Up @@ -150,42 +154,52 @@ impl FetchStage {
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
tpu_disable_udp: bool,
) -> Self {
let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024);

let tpu_stats = Arc::new(StreamerReceiveStats::new("tpu_receiver"));
let tpu_threads: Vec<_> = tpu_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
sender.clone(),
recycler.clone(),
tpu_stats.clone(),
coalesce_ms,
true,
in_vote_only_mode.clone(),
)
})
.collect();

let tpu_threads: Vec<_> = if !tpu_disable_udp {
tpu_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
sender.clone(),
recycler.clone(),
tpu_stats.clone(),
coalesce_ms,
true,
in_vote_only_mode.clone(),
)
})
.collect()
} else {
Vec::default()
};

let tpu_forward_stats = Arc::new(StreamerReceiveStats::new("tpu_forwards_receiver"));
let tpu_forwards_threads: Vec<_> = tpu_forwards_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
forward_sender.clone(),
recycler.clone(),
tpu_forward_stats.clone(),
coalesce_ms,
true,
in_vote_only_mode.clone(),
)
})
.collect();
let tpu_forwards_threads: Vec<_> = if !tpu_disable_udp {
tpu_forwards_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
forward_sender.clone(),
recycler.clone(),
tpu_forward_stats.clone(),
coalesce_ms,
true,
in_vote_only_mode.clone(),
)
})
.collect()
} else {
Vec::default()
};

let tpu_vote_stats = Arc::new(StreamerReceiveStats::new("tpu_vote_receiver"));
let tpu_vote_threads: Vec<_> = tpu_vote_sockets
Expand Down
2 changes: 2 additions & 0 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl Tpu {
log_messages_bytes_limit: Option<usize>,
staked_nodes: &Arc<RwLock<StakedNodes>>,
shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
tpu_disable_udp: bool,
) -> Self {
let TpuSockets {
transactions: transactions_sockets,
Expand All @@ -124,6 +125,7 @@ impl Tpu {
poh_recorder,
tpu_coalesce_ms,
Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
tpu_disable_udp,
);

let staked_nodes_updater_service = StakedNodesUpdaterService::new(
Expand Down
6 changes: 5 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ impl Validator {
socket_addr_space: SocketAddrSpace,
use_quic: bool,
tpu_connection_pool_size: usize,
tpu_disable_udp: bool,
) -> Result<Self, String> {
let id = identity_keypair.pubkey();
assert_eq!(id, node.info.id);
Expand Down Expand Up @@ -1040,6 +1041,7 @@ impl Validator {
config.runtime_config.log_messages_bytes_limit,
&staked_nodes,
config.staked_nodes_overrides.clone(),
tpu_disable_udp,
);

datapoint_info!(
Expand Down Expand Up @@ -2188,7 +2190,7 @@ mod tests {
solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader},
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
solana_tpu_client::connection_cache::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_DISABLE_UDP, DEFAULT_TPU_USE_QUIC,
},
std::{fs::remove_dir_all, thread, time::Duration},
};
Expand Down Expand Up @@ -2225,6 +2227,7 @@ mod tests {
SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_DISABLE_UDP,
)
.expect("assume successful validator start");
assert_eq!(
Expand Down Expand Up @@ -2309,6 +2312,7 @@ mod tests {
SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_DISABLE_UDP,
)
.expect("assume successful validator start")
})
Expand Down
6 changes: 5 additions & 1 deletion local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ use {
solana_streamer::socket::SocketAddrSpace,
solana_thin_client::thin_client::ThinClient,
solana_tpu_client::connection_cache::{
ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC,
ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_DISABLE_UDP,
DEFAULT_TPU_USE_QUIC,
},
solana_vote_program::{
vote_instruction,
Expand Down Expand Up @@ -278,6 +279,7 @@ impl LocalCluster {
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_DISABLE_UDP,
)
.expect("assume successful validator start");

Expand Down Expand Up @@ -477,6 +479,7 @@ impl LocalCluster {
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_DISABLE_UDP,
)
.expect("assume successful validator start");

Expand Down Expand Up @@ -839,6 +842,7 @@ impl Cluster for LocalCluster {
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_DISABLE_UDP,
)
.expect("assume successful validator start");
cluster_validator_info.validator = Some(restarted_node);
Expand Down
5 changes: 4 additions & 1 deletion test-validator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ use {
signature::{read_keypair_file, write_keypair_file, Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::connection_cache::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC},
solana_tpu_client::connection_cache::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_DISABLE_UDP, DEFAULT_TPU_USE_QUIC,
},
std::{
collections::{HashMap, HashSet},
ffi::OsStr,
Expand Down Expand Up @@ -809,6 +811,7 @@ impl TestValidator {
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_DISABLE_UDP,
)?);

// Needed to avoid panics in `solana-responder-gossip` in tests that create a number of
Expand Down
2 changes: 2 additions & 0 deletions tpu-client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub const DEFAULT_TPU_USE_QUIC: bool = true;
/// Default TPU connection pool size per remote address
pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;

pub const DEFAULT_TPU_DISABLE_UDP: bool = false;

#[derive(Default)]
pub struct ConnectionCacheStats {
cache_hits: AtomicU64,
Expand Down
9 changes: 9 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,13 @@ pub fn main() {
.takes_value(false)
.help("Do not use QUIC to send transactions."),
)
.arg(
Arg::with_name("tpu_disable_udp")
.long("tpu-disable-udp")
.takes_value(false)
.conflicts_with("tpu_disable_quic")
.help("Do not use UDP for receiving/sending transactions."),
)
.arg(
Arg::with_name("disable_quic_servers")
.long("disable-quic-servers")
Expand Down Expand Up @@ -2431,6 +2438,7 @@ pub fn main() {
let accounts_shrink_optimize_total_space =
value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
let tpu_use_quic = !matches.is_present("tpu_disable_quic");
let tpu_disable_udp = matches.is_present("tpu_disable_udp");
let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize);

let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
Expand Down Expand Up @@ -3220,6 +3228,7 @@ pub fn main() {
socket_addr_space,
tpu_use_quic,
tpu_connection_pool_size,
tpu_disable_udp,
)
.unwrap_or_else(|e| {
error!("Failed to start validator: {:?}", e);
Expand Down