Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
separates out quic streamer connection stats from different servers (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri authored May 25, 2023
1 parent 9281ab7 commit f1ebc5b
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 75 deletions.
22 changes: 4 additions & 18 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ mod tests {
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::StreamStats,
streamer::StakedNodes,
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, streamer::StakedNodes,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
Expand All @@ -229,37 +228,25 @@ mod tests {
},
};

fn server_args() -> (
UdpSocket,
Arc<AtomicBool>,
Keypair,
IpAddr,
Arc<StreamStats>,
) {
fn server_args() -> (UdpSocket, Arc<AtomicBool>, Keypair, IpAddr) {
(
UdpSocket::bind("127.0.0.1:0").unwrap(),
Arc::new(AtomicBool::new(false)),
Keypair::new(),
"127.0.0.1".parse().unwrap(),
Arc::new(StreamStats::default()),
)
}

#[test]
fn test_connection_with_specified_client_endpoint() {
// Start a response receiver:
let (
response_recv_socket,
response_recv_exit,
keypair2,
response_recv_ip,
response_recv_stats,
) = server_args();
let (response_recv_socket, response_recv_exit, keypair2, response_recv_ip) = server_args();
let (sender2, _receiver2) = unbounded();

let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));

let (response_recv_endpoint, response_recv_thread) = solana_streamer::quic::spawn_server(
"quic_streamer_test",
response_recv_socket,
&keypair2,
response_recv_ip,
Expand All @@ -269,7 +256,6 @@ mod tests {
staked_nodes,
10,
10,
response_recv_stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
7 changes: 3 additions & 4 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use {
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
quic::{spawn_server, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
std::{
Expand Down Expand Up @@ -145,8 +145,8 @@ impl Tpu {

let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();

let stats = Arc::new(StreamStats::default());
let (_, tpu_quic_t) = spawn_server(
"quic_streamer_tpu",
transactions_quic_sockets,
keypair,
cluster_info
Expand All @@ -160,13 +160,13 @@ impl Tpu {
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
.unwrap();

let (_, tpu_forwards_quic_t) = spawn_server(
"quic_streamer_tpu_forwards",
transactions_forwards_quic_sockets,
keypair,
cluster_info
Expand All @@ -180,7 +180,6 @@ impl Tpu {
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand Down
38 changes: 12 additions & 26 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ mod tests {
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::StreamStats,
streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate,
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, streamer::StakedNodes,
tls_certificates::new_self_signed_tls_certificate,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
Expand Down Expand Up @@ -49,19 +49,12 @@ mod tests {
assert_eq!(total_packets, num_expected_packets);
}

fn server_args() -> (
UdpSocket,
Arc<AtomicBool>,
Keypair,
IpAddr,
Arc<StreamStats>,
) {
fn server_args() -> (UdpSocket, Arc<AtomicBool>, Keypair, IpAddr) {
(
UdpSocket::bind("127.0.0.1:0").unwrap(),
Arc::new(AtomicBool::new(false)),
Keypair::new(),
"127.0.0.1".parse().unwrap(),
Arc::new(StreamStats::default()),
)
}

Expand All @@ -74,8 +67,9 @@ mod tests {
solana_logger::setup();
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (s, exit, keypair, ip, stats) = server_args();
let (s, exit, keypair, ip) = server_args();
let (_, t) = solana_streamer::quic::spawn_server(
"quic_streamer_test",
s.try_clone().unwrap(),
&keypair,
ip,
Expand All @@ -85,7 +79,6 @@ mod tests {
staked_nodes,
10,
10,
stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -154,8 +147,9 @@ mod tests {
solana_logger::setup();
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (s, exit, keypair, ip, stats) = server_args();
let (_, t) = solana_streamer::nonblocking::quic::spawn_server(
let (s, exit, keypair, ip) = server_args();
let (_, _, t) = solana_streamer::nonblocking::quic::spawn_server(
"quic_streamer_test",
s.try_clone().unwrap(),
&keypair,
ip,
Expand All @@ -165,7 +159,6 @@ mod tests {
staked_nodes,
10,
10,
stats,
Duration::from_secs(1), // wait_for_chunk_timeout
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -210,9 +203,9 @@ mod tests {
// Request Receiver
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (request_recv_socket, request_recv_exit, keypair, request_recv_ip, request_recv_stats) =
server_args();
let (request_recv_socket, request_recv_exit, keypair, request_recv_ip) = server_args();
let (request_recv_endpoint, request_recv_thread) = solana_streamer::quic::spawn_server(
"quic_streamer_test",
request_recv_socket.try_clone().unwrap(),
&keypair,
request_recv_ip,
Expand All @@ -222,27 +215,21 @@ mod tests {
staked_nodes.clone(),
10,
10,
request_recv_stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
.unwrap();

drop(request_recv_endpoint);
// Response Receiver:
let (
response_recv_socket,
response_recv_exit,
keypair2,
response_recv_ip,
response_recv_stats,
) = server_args();
let (response_recv_socket, response_recv_exit, keypair2, response_recv_ip) = server_args();
let (sender2, receiver2) = unbounded();

let addr = response_recv_socket.local_addr().unwrap().ip();
let port = response_recv_socket.local_addr().unwrap().port();
let server_addr = SocketAddr::new(addr, port);
let (response_recv_endpoint, response_recv_thread) = solana_streamer::quic::spawn_server(
"quic_streamer_test",
response_recv_socket,
&keypair2,
response_recv_ip,
Expand All @@ -252,7 +239,6 @@ mod tests {
staked_nodes,
10,
10,
response_recv_stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
32 changes: 16 additions & 16 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ struct PacketAccumulator {

#[allow(clippy::too_many_arguments)]
pub fn spawn_server(
name: &'static str,
sock: UdpSocket,
keypair: &Keypair,
gossip_host: IpAddr,
Expand All @@ -94,32 +95,34 @@ pub fn spawn_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) -> Result<(Endpoint, JoinHandle<()>), QuicServerError> {
info!("Start quic server on {:?}", sock);
) -> Result<(Endpoint, Arc<StreamStats>, JoinHandle<()>), QuicServerError> {
info!("Start {name} quic server on {sock:?}");
let (config, _cert) = configure_server(keypair, gossip_host)?;

let endpoint = Endpoint::new(EndpointConfig::default(), Some(config), sock, TokioRuntime)
.map_err(QuicServerError::EndpointFailed)?;
let stats = Arc::<StreamStats>::default();
let handle = tokio::spawn(run_server(
name,
endpoint.clone(),
packet_sender,
exit,
max_connections_per_peer,
staked_nodes,
max_staked_connections,
max_unstaked_connections,
stats,
stats.clone(),
wait_for_chunk_timeout,
coalesce,
));
Ok((endpoint, handle))
Ok((endpoint, stats, handle))
}

#[allow(clippy::too_many_arguments)]
pub async fn run_server(
async fn run_server(
name: &'static str,
incoming: Endpoint,
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
Expand Down Expand Up @@ -152,7 +155,7 @@ pub async fn run_server(
let timeout_connection = timeout(WAIT_FOR_CONNECTION_TIMEOUT, incoming.accept()).await;

if last_datapoint.elapsed().as_secs() >= 5 {
stats.report();
stats.report(name);
last_datapoint = Instant::now();
}

Expand Down Expand Up @@ -1166,8 +1169,8 @@ pub mod test {
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default()));
let stats = Arc::new(StreamStats::default());
let (_, t) = spawn_server(
let (_, stats, t) = spawn_server(
"quic_streamer_test",
s,
&keypair,
ip,
Expand All @@ -1177,7 +1180,6 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
Duration::from_secs(2),
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -1597,8 +1599,8 @@ pub mod test {
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let (_, t) = spawn_server(
let (_, _, t) = spawn_server(
"quic_streamer_test",
s,
&keypair,
ip,
Expand All @@ -1608,7 +1610,6 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand All @@ -1629,8 +1630,8 @@ pub mod test {
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let (_, t) = spawn_server(
let (_, stats, t) = spawn_server(
"quic_streamer_test",
s,
&keypair,
ip,
Expand All @@ -1640,7 +1641,6 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
Loading

0 comments on commit f1ebc5b

Please sign in to comment.