Skip to content

Commit

Permalink
handle semaphore locally without impacting the interface
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs committed Oct 19, 2022
1 parent a6884d6 commit ab85ab7
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 101 deletions.
22 changes: 3 additions & 19 deletions tpu-client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
//! server's flow control.
use {
crate::{
connection_cache_stats::ConnectionCacheStats,
nonblocking::tpu_connection::{SendTransactionCallbackOption, TpuConnection},
connection_cache_stats::ConnectionCacheStats, nonblocking::tpu_connection::TpuConnection,
tpu_connection::ClientStats,
},
async_mutex::Mutex,
Expand Down Expand Up @@ -555,11 +554,7 @@ impl TpuConnection for QuicTpuConnection {
self.client.tpu_addr()
}

async fn send_wire_transaction_batch<T>(
&self,
buffers: &[T],
callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
Expand All @@ -571,19 +566,11 @@ impl TpuConnection for QuicTpuConnection {
.await;
self.connection_stats
.add_client_stats(&stats, len, res.is_ok());
if let Some(callback) = callback {
callback();
}

res?;
Ok(())
}

async fn send_wire_transaction<T>(
&self,
wire_transaction: T,
callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
Expand All @@ -602,9 +589,6 @@ impl TpuConnection for QuicTpuConnection {
} else {
self.connection_stats.add_client_stats(&stats, 1, true);
}
if let Some(callback) = callback {
callback();
}
Ok(())
}
}
6 changes: 2 additions & 4 deletions tpu-client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ async fn send_wire_transaction_to_addr(
wire_transaction: Vec<u8>,
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction(wire_transaction.clone(), &mut None)
.await
conn.send_wire_transaction(wire_transaction.clone()).await
}

async fn send_wire_transaction_batch_to_addr(
Expand All @@ -238,8 +237,7 @@ async fn send_wire_transaction_batch_to_addr(
wire_transactions: &[Vec<u8>],
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction_batch(wire_transactions, &mut None)
.await
conn.send_wire_transaction_batch(wire_transactions).await
}

impl TpuClient {
Expand Down
19 changes: 3 additions & 16 deletions tpu-client/src/nonblocking/tpu_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ pub enum NonblockingConnection {
UdpTpuConnection,
}

pub type SendTransactionCallback = Box<dyn FnMut() + Sync + Send>;
pub type SendTransactionCallbackOption = Option<SendTransactionCallback>;

#[async_trait]
#[enum_dispatch(NonblockingConnection)]
pub trait TpuConnection {
Expand All @@ -28,27 +25,17 @@ pub trait TpuConnection {
async fn serialize_and_send_transaction(
&self,
transaction: &VersionedTransaction,
callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()> {
let wire_transaction =
bincode::serialize(transaction).expect("serialize Transaction in send_batch");
self.send_wire_transaction(&wire_transaction, callback)
.await
self.send_wire_transaction(&wire_transaction).await
}

async fn send_wire_transaction<T>(
&self,
wire_transaction: T,
callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync;

async fn send_wire_transaction_batch<T>(
&self,
buffers: &[T],
callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync;
}
28 changes: 7 additions & 21 deletions tpu-client/src/nonblocking/udp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
//! an interface for sending transactions
use {
crate::nonblocking::tpu_connection::{SendTransactionCallbackOption, TpuConnection},
async_trait::async_trait,
core::iter::repeat,
solana_sdk::transport::Result as TransportResult,
solana_streamer::nonblocking::sendmmsg::batch_send,
std::net::SocketAddr,
crate::nonblocking::tpu_connection::TpuConnection, async_trait::async_trait,
core::iter::repeat, solana_sdk::transport::Result as TransportResult,
solana_streamer::nonblocking::sendmmsg::batch_send, std::net::SocketAddr,
tokio::net::UdpSocket,
};

Expand All @@ -33,11 +30,7 @@ impl TpuConnection for UdpTpuConnection {
&self.addr
}

async fn send_wire_transaction<T>(
&self,
wire_transaction: T,
_callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
Expand All @@ -47,11 +40,7 @@ impl TpuConnection for UdpTpuConnection {
Ok(())
}

async fn send_wire_transaction_batch<T>(
&self,
buffers: &[T],
_callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
Expand All @@ -73,10 +62,7 @@ mod tests {

async fn check_send_one(connection: &UdpTpuConnection, reader: &UdpSocket) {
let packet = vec![111u8; PACKET_DATA_SIZE];
connection
.send_wire_transaction(&packet, &mut None)
.await
.unwrap();
connection.send_wire_transaction(&packet).await.unwrap();
let mut packets = vec![Packet::default(); 32];
let recv = recv_mmsg(reader, &mut packets[..]).await.unwrap();
assert_eq!(1, recv);
Expand All @@ -85,7 +71,7 @@ mod tests {
async fn check_send_batch(connection: &UdpTpuConnection, reader: &UdpSocket) {
let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
connection
.send_wire_transaction_batch(&packets, &mut None)
.send_wire_transaction_batch(&packets)
.await
.unwrap();
let mut packets = vec![Packet::default(); 32];
Expand Down
48 changes: 24 additions & 24 deletions tpu-client/src/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,24 @@ impl QuicTpuConnection {
}
}

async fn send_wire_transaction_async(
connection: Arc<NonblockingQuicTpuConnection>,
wire_transaction: Vec<u8>,
) -> TransportResult<()> {
let result = connection.send_wire_transaction(wire_transaction).await;
ASYNC_TASK_SEMAPHORE.release();
result
}

async fn send_wire_transaction_batch_async(
connection: Arc<NonblockingQuicTpuConnection>,
buffers: Vec<Vec<u8>>,
) -> TransportResult<()> {
let result = connection.send_wire_transaction_batch(&buffers).await;
ASYNC_TASK_SEMAPHORE.release();
result
}

impl TpuConnection for QuicTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
self.inner.tpu_addr()
Expand All @@ -113,42 +131,24 @@ impl TpuConnection for QuicTpuConnection {
where
T: AsRef<[u8]> + Send + Sync,
{
RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers, &mut None))?;
RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?;
Ok(())
}

fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
let _lock = ASYNC_TASK_SEMAPHORE.acquire();
let inner = self.inner.clone();
//drop and detach the task

let _ = RUNTIME.spawn(async move {
inner
.send_wire_transaction(
wire_transaction,
&mut Some(Box::new(|| {
ASYNC_TASK_SEMAPHORE.release();
})),
)
.await
});

let _ = RUNTIME
.spawn(async move { send_wire_transaction_async(inner, wire_transaction).await });
Ok(())
}

fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
let _lock = ASYNC_TASK_SEMAPHORE.acquire();
let inner = self.inner.clone();

let _ = RUNTIME.spawn(async move {
inner
.send_wire_transaction_batch(
&buffers,
&mut Some(Box::new(|| {
ASYNC_TASK_SEMAPHORE.release();
})),
)
.await
});
let _ =
RUNTIME.spawn(async move { send_wire_transaction_batch_async(inner, buffers).await });
Ok(())
}
}
16 changes: 3 additions & 13 deletions tpu-client/src/tpu_connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,7 @@ mod tests {
use {
super::*,
crate::{
nonblocking::tpu_connection::{
SendTransactionCallbackOption, TpuConnection as NonblockingTpuConnection,
},
nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection,
tpu_connection::TpuConnection as BlockingTpuConnection,
},
async_trait::async_trait,
Expand Down Expand Up @@ -470,21 +468,13 @@ mod tests {
fn tpu_addr(&self) -> &SocketAddr {
&self.addr
}
async fn send_wire_transaction<T>(
&self,
_wire_transaction: T,
_callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
async fn send_wire_transaction<T>(&self, _wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
unimplemented!()
}
async fn send_wire_transaction_batch<T>(
&self,
_buffers: &[T],
_callback: &mut SendTransactionCallbackOption,
) -> TransportResult<()>
async fn send_wire_transaction_batch<T>(&self, _buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
Expand Down
5 changes: 1 addition & 4 deletions tpu-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,7 @@ mod tests {
let num_expected_packets: usize = 3000;
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];

assert!(client
.send_wire_transaction_batch(&packets, &mut None)
.await
.is_ok());
assert!(client.send_wire_transaction_batch(&packets).await.is_ok());

check_packets(receiver, num_bytes, num_expected_packets);
exit.store(true, Ordering::Relaxed);
Expand Down

0 comments on commit ab85ab7

Please sign in to comment.