Skip to content

Commit

Permalink
Limit the number async tasks spawned to the runtime to avoid congesti…
Browse files Browse the repository at this point in the history
…ons.
  • Loading branch information
lijunwangs committed Oct 19, 2022
1 parent 510cd93 commit 8a26924
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 18 deletions.
1 change: 1 addition & 0 deletions tpu-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ edition = "2021"
async-mutex = "1.4.0"
async-trait = "0.1.57"
bincode = "1.3.3"
crossbeam-channel = "0.5"
enum_dispatch = "0.3.8"
futures = "0.3"
futures-util = "0.3.21"
Expand Down
22 changes: 19 additions & 3 deletions tpu-client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
//! server's flow control.
use {
crate::{
connection_cache_stats::ConnectionCacheStats, nonblocking::tpu_connection::TpuConnection,
connection_cache_stats::ConnectionCacheStats,
nonblocking::tpu_connection::{SendTransactionCallbackOption, TpuConnection},
tpu_connection::ClientStats,
},
async_mutex::Mutex,
Expand Down Expand Up @@ -554,7 +555,11 @@ impl TpuConnection for QuicTpuConnection {
self.client.tpu_addr()
}

async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
async fn send_wire_transaction_batch<T>(
&self,
buffers: &[T],
callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
Expand All @@ -566,11 +571,19 @@ impl TpuConnection for QuicTpuConnection {
.await;
self.connection_stats
.add_client_stats(&stats, len, res.is_ok());
if let Some(callback) = callback {
callback();
}

res?;
Ok(())
}

async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
async fn send_wire_transaction<T>(
&self,
wire_transaction: T,
callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
Expand All @@ -589,6 +602,9 @@ impl TpuConnection for QuicTpuConnection {
} else {
self.connection_stats.add_client_stats(&stats, 1, true);
}
if let Some(callback) = callback {
callback();
}
Ok(())
}
}
6 changes: 4 additions & 2 deletions tpu-client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ async fn send_wire_transaction_to_addr(
wire_transaction: Vec<u8>,
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction(wire_transaction.clone()).await
conn.send_wire_transaction(wire_transaction.clone(), &mut None)
.await
}

async fn send_wire_transaction_batch_to_addr(
Expand All @@ -237,7 +238,8 @@ async fn send_wire_transaction_batch_to_addr(
wire_transactions: &[Vec<u8>],
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction_batch(wire_transactions).await
conn.send_wire_transaction_batch(wire_transactions, &mut None)
.await
}

impl TpuClient {
Expand Down
19 changes: 16 additions & 3 deletions tpu-client/src/nonblocking/tpu_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub enum NonblockingConnection {
UdpTpuConnection,
}

pub type SendTransactionCallback = Box<dyn FnMut() + Sync + Send>;
pub type SendTransactionCallbackOption = Option<SendTransactionCallback>;

#[async_trait]
#[enum_dispatch(NonblockingConnection)]
pub trait TpuConnection {
Expand All @@ -25,17 +28,27 @@ pub trait TpuConnection {
async fn serialize_and_send_transaction(
&self,
transaction: &VersionedTransaction,
callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()> {
let wire_transaction =
bincode::serialize(transaction).expect("serialize Transaction in send_batch");
self.send_wire_transaction(&wire_transaction).await
self.send_wire_transaction(&wire_transaction, callback)
.await
}

async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
async fn send_wire_transaction<T>(
&self,
wire_transaction: T,
callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync;

async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
async fn send_wire_transaction_batch<T>(
&self,
buffers: &[T],
callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync;
}
21 changes: 16 additions & 5 deletions tpu-client/src/nonblocking/udp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
//! an interface for sending transactions
use {
crate::nonblocking::tpu_connection::TpuConnection, async_trait::async_trait,
core::iter::repeat, solana_sdk::transport::Result as TransportResult,
solana_streamer::nonblocking::sendmmsg::batch_send, std::net::SocketAddr,
crate::nonblocking::tpu_connection::{SendTransactionCallbackOption, TpuConnection},
async_trait::async_trait,
core::iter::repeat,
solana_sdk::transport::Result as TransportResult,
solana_streamer::nonblocking::sendmmsg::batch_send,
std::net::SocketAddr,
tokio::net::UdpSocket,
};

Expand All @@ -30,7 +33,11 @@ impl TpuConnection for UdpTpuConnection {
&self.addr
}

async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
async fn send_wire_transaction<T>(
&self,
wire_transaction: T,
_callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
Expand All @@ -40,7 +47,11 @@ impl TpuConnection for UdpTpuConnection {
Ok(())
}

async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
async fn send_wire_transaction_batch<T>(
&self,
buffers: &[T],
_callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
Expand Down
80 changes: 75 additions & 5 deletions tpu-client/src/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,59 @@ use {
},
lazy_static::lazy_static,
solana_sdk::transport::Result as TransportResult,
std::{net::SocketAddr, sync::Arc},
std::{
net::SocketAddr,
sync::{Arc, Condvar, Mutex, MutexGuard},
},
tokio::runtime::Runtime,
};

const MAX_OUTSTANDING_TASK: u64 = 2000;

/// A semaphore used for limiting the number of asynchronous tasks spawn to the
/// runtime. Before spawnning a task, use acquire. After the task is done (be it
/// succsess or failure), call release.
struct AsyncTaskSemaphore {
/// Keep the counter info about the usage
counter: Mutex<u64>,
/// Conditional variable for signaling when counter is decremented
cond_var: Condvar,
/// The maximum usage allowed by this semaphore.
permits: u64,
}

impl AsyncTaskSemaphore {
fn new(permits: u64) -> Self {
Self {
counter: Mutex::new(0),
cond_var: Condvar::new(),
permits,
}
}

/// When returned, the lock has been locked and usage count has been
/// incremented. When the returned MutexGuard is dropped the lock is dropped
/// without decrementing the usage count.
fn acquire(&self) -> MutexGuard<u64> {
let mut count = self.counter.lock().unwrap();
*count += 1;
while *count >= self.permits {
count = self.cond_var.wait(count).unwrap();
}
count
}

/// Acquire the lock and decrement the usage count
fn release(&self) {
let mut count = self.counter.lock().unwrap();
*count -= 1;
self.cond_var.notify_one();
}
}

lazy_static! {
static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore =
AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK);
static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("quic-client")
.enable_all()
Expand Down Expand Up @@ -65,21 +113,43 @@ impl TpuConnection for QuicTpuConnection {
where
T: AsRef<[u8]> + Send + Sync,
{
RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?;
RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers, &mut None))?;
Ok(())
}

fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
let _lock = ASYNC_TASK_SEMAPHORE.acquire();
let inner = self.inner.clone();
//drop and detach the task
let _ = RUNTIME.spawn(async move { inner.send_wire_transaction(wire_transaction).await });


let _ = RUNTIME.spawn(async move {
inner
.send_wire_transaction(
wire_transaction,
&mut Some(Box::new(|| {
ASYNC_TASK_SEMAPHORE.release();
})),
)
.await
});
Ok(())
}

fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
let _lock = ASYNC_TASK_SEMAPHORE.acquire();
let inner = self.inner.clone();
//drop and detach the task
let _ = RUNTIME.spawn(async move { inner.send_wire_transaction_batch(&buffers).await });

let _ = RUNTIME.spawn(async move {
inner
.send_wire_transaction_batch(
&buffers,
&mut Some(Box::new(|| {
ASYNC_TASK_SEMAPHORE.release();
})),
)
.await
});
Ok(())
}
}

0 comments on commit 8a26924

Please sign in to comment.