From 8a269246ac6c5a6ae455e8aed3dadfffd3547caa Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 13 Oct 2022 14:50:05 -0700 Subject: [PATCH] Limit the number async tasks spawned to the runtime to avoid congestions. --- tpu-client/Cargo.toml | 1 + tpu-client/src/nonblocking/quic_client.rs | 22 +++++- tpu-client/src/nonblocking/tpu_client.rs | 6 +- tpu-client/src/nonblocking/tpu_connection.rs | 19 ++++- tpu-client/src/nonblocking/udp_client.rs | 21 +++-- tpu-client/src/quic_client.rs | 80 ++++++++++++++++++-- 6 files changed, 131 insertions(+), 18 deletions(-) diff --git a/tpu-client/Cargo.toml b/tpu-client/Cargo.toml index d2dba4af8df56e..ccfa6280cf0a10 100644 --- a/tpu-client/Cargo.toml +++ b/tpu-client/Cargo.toml @@ -13,6 +13,7 @@ edition = "2021" async-mutex = "1.4.0" async-trait = "0.1.57" bincode = "1.3.3" +crossbeam-channel = "0.5" enum_dispatch = "0.3.8" futures = "0.3" futures-util = "0.3.21" diff --git a/tpu-client/src/nonblocking/quic_client.rs b/tpu-client/src/nonblocking/quic_client.rs index e9339f5ef24980..0646e19ad9d18a 100644 --- a/tpu-client/src/nonblocking/quic_client.rs +++ b/tpu-client/src/nonblocking/quic_client.rs @@ -3,7 +3,8 @@ //! server's flow control. use { crate::{ - connection_cache_stats::ConnectionCacheStats, nonblocking::tpu_connection::TpuConnection, + connection_cache_stats::ConnectionCacheStats, + nonblocking::tpu_connection::{SendTransactionCallbackOption, TpuConnection}, tpu_connection::ClientStats, }, async_mutex::Mutex, @@ -554,7 +555,11 @@ impl TpuConnection for QuicTpuConnection { self.client.tpu_addr() } - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> + async fn send_wire_transaction_batch( + &self, + buffers: &[T], + callback: &mut SendTransactionCallbackOption, + ) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync, { @@ -566,11 +571,19 @@ impl TpuConnection for QuicTpuConnection { .await; self.connection_stats .add_client_stats(&stats, len, res.is_ok()); + if let Some(callback) = callback { + callback(); + } + res?; Ok(()) } - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> + async fn send_wire_transaction( + &self, + wire_transaction: T, + callback: &mut SendTransactionCallbackOption, + ) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync, { @@ -589,6 +602,9 @@ impl TpuConnection for QuicTpuConnection { } else { self.connection_stats.add_client_stats(&stats, 1, true); } + if let Some(callback) = callback { + callback(); + } Ok(()) } } diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 6a46969047dfc5..ff6c671a10ccf5 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -228,7 +228,8 @@ async fn send_wire_transaction_to_addr( wire_transaction: Vec, ) -> TransportResult<()> { let conn = connection_cache.get_nonblocking_connection(addr); - conn.send_wire_transaction(wire_transaction.clone()).await + conn.send_wire_transaction(wire_transaction.clone(), &mut None) + .await } async fn send_wire_transaction_batch_to_addr( @@ -237,7 +238,8 @@ async fn send_wire_transaction_batch_to_addr( wire_transactions: &[Vec], ) -> TransportResult<()> { let conn = connection_cache.get_nonblocking_connection(addr); - conn.send_wire_transaction_batch(wire_transactions).await + conn.send_wire_transaction_batch(wire_transactions, &mut None) + .await } impl TpuClient { diff --git a/tpu-client/src/nonblocking/tpu_connection.rs b/tpu-client/src/nonblocking/tpu_connection.rs index 9e819070bc0c47..9a1d4bec935a0c 100644 --- a/tpu-client/src/nonblocking/tpu_connection.rs +++ b/tpu-client/src/nonblocking/tpu_connection.rs @@ -17,6 +17,9 @@ pub enum NonblockingConnection { UdpTpuConnection, } +pub type SendTransactionCallback = Box; +pub type SendTransactionCallbackOption = Option; + #[async_trait] #[enum_dispatch(NonblockingConnection)] pub trait TpuConnection { @@ -25,17 +28,27 @@ pub trait TpuConnection { async fn serialize_and_send_transaction( &self, transaction: &VersionedTransaction, + callback: &mut SendTransactionCallbackOption, ) -> TransportResult<()> { let wire_transaction = bincode::serialize(transaction).expect("serialize Transaction in send_batch"); - self.send_wire_transaction(&wire_transaction).await + self.send_wire_transaction(&wire_transaction, callback) + .await } - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> + async fn send_wire_transaction( + &self, + wire_transaction: T, + callback: &mut SendTransactionCallbackOption, + ) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync; - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> + async fn send_wire_transaction_batch( + &self, + buffers: &[T], + callback: &mut SendTransactionCallbackOption, + ) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync; } diff --git a/tpu-client/src/nonblocking/udp_client.rs b/tpu-client/src/nonblocking/udp_client.rs index 1a418765042dad..1155f21f60e798 100644 --- a/tpu-client/src/nonblocking/udp_client.rs +++ b/tpu-client/src/nonblocking/udp_client.rs @@ -2,9 +2,12 @@ //! an interface for sending transactions use { - crate::nonblocking::tpu_connection::TpuConnection, async_trait::async_trait, - core::iter::repeat, solana_sdk::transport::Result as TransportResult, - solana_streamer::nonblocking::sendmmsg::batch_send, std::net::SocketAddr, + crate::nonblocking::tpu_connection::{SendTransactionCallbackOption, TpuConnection}, + async_trait::async_trait, + core::iter::repeat, + solana_sdk::transport::Result as TransportResult, + solana_streamer::nonblocking::sendmmsg::batch_send, + std::net::SocketAddr, tokio::net::UdpSocket, }; @@ -30,7 +33,11 @@ impl TpuConnection for UdpTpuConnection { &self.addr } - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> + async fn send_wire_transaction( + &self, + wire_transaction: T, + _callback: &mut SendTransactionCallbackOption, + ) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync, { @@ -40,7 +47,11 @@ impl TpuConnection for UdpTpuConnection { Ok(()) } - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> + async fn send_wire_transaction_batch( + &self, + buffers: &[T], + _callback: &mut SendTransactionCallbackOption, + ) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync, { diff --git a/tpu-client/src/quic_client.rs b/tpu-client/src/quic_client.rs index 47a597e1718787..0c04bba0911ed0 100644 --- a/tpu-client/src/quic_client.rs +++ b/tpu-client/src/quic_client.rs @@ -15,11 +15,59 @@ use { }, lazy_static::lazy_static, solana_sdk::transport::Result as TransportResult, - std::{net::SocketAddr, sync::Arc}, + std::{ + net::SocketAddr, + sync::{Arc, Condvar, Mutex, MutexGuard}, + }, tokio::runtime::Runtime, }; +const MAX_OUTSTANDING_TASK: u64 = 2000; + +/// A semaphore used for limiting the number of asynchronous tasks spawn to the +/// runtime. Before spawnning a task, use acquire. After the task is done (be it +/// succsess or failure), call release. +struct AsyncTaskSemaphore { + /// Keep the counter info about the usage + counter: Mutex, + /// Conditional variable for signaling when counter is decremented + cond_var: Condvar, + /// The maximum usage allowed by this semaphore. + permits: u64, +} + +impl AsyncTaskSemaphore { + fn new(permits: u64) -> Self { + Self { + counter: Mutex::new(0), + cond_var: Condvar::new(), + permits, + } + } + + /// When returned, the lock has been locked and usage count has been + /// incremented. When the returned MutexGuard is dropped the lock is dropped + /// without decrementing the usage count. + fn acquire(&self) -> MutexGuard { + let mut count = self.counter.lock().unwrap(); + *count += 1; + while *count >= self.permits { + count = self.cond_var.wait(count).unwrap(); + } + count + } + + /// Acquire the lock and decrement the usage count + fn release(&self) { + let mut count = self.counter.lock().unwrap(); + *count -= 1; + self.cond_var.notify_one(); + } +} + lazy_static! { + static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore = + AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK); static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() .thread_name("quic-client") .enable_all() @@ -65,21 +113,43 @@ impl TpuConnection for QuicTpuConnection { where T: AsRef<[u8]> + Send + Sync, { - RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?; + RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers, &mut None))?; Ok(()) } fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { + let _lock = ASYNC_TASK_SEMAPHORE.acquire(); let inner = self.inner.clone(); //drop and detach the task - let _ = RUNTIME.spawn(async move { inner.send_wire_transaction(wire_transaction).await }); + + + let _ = RUNTIME.spawn(async move { + inner + .send_wire_transaction( + wire_transaction, + &mut Some(Box::new(|| { + ASYNC_TASK_SEMAPHORE.release(); + })), + ) + .await + }); Ok(()) } fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { + let _lock = ASYNC_TASK_SEMAPHORE.acquire(); let inner = self.inner.clone(); - //drop and detach the task - let _ = RUNTIME.spawn(async move { inner.send_wire_transaction_batch(&buffers).await }); + + let _ = RUNTIME.spawn(async move { + inner + .send_wire_transaction_batch( + &buffers, + &mut Some(Box::new(|| { + ASYNC_TASK_SEMAPHORE.release(); + })), + ) + .await + }); Ok(()) } }