Skip to content

Commit

Permalink
feat(pubsub): set channel size (alloy-rs#602)
Browse files Browse the repository at this point in the history
  • Loading branch information
yash-atreya authored Apr 23, 2024
1 parent 234ff5e commit f2e14f7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
24 changes: 16 additions & 8 deletions crates/pubsub/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use alloy_transport::{TransportError, TransportErrorKind, TransportFut, Transpor
use futures::{future::try_join_all, FutureExt, TryFutureExt};
use std::{
future::Future,
sync::atomic::{AtomicUsize, Ordering},
task::{Context, Poll},
};
use tokio::sync::{mpsc, oneshot};
Expand All @@ -13,18 +14,25 @@ use tokio::sync::{mpsc, oneshot};
/// PubSub service.
///
/// [`Transport`]: alloy_transport::Transport
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct PubSubFrontend {
tx: mpsc::UnboundedSender<PubSubInstruction>,
/// The number of items to buffer in new subscription channels. Defaults to
/// 16. See [`tokio::sync::broadcast::channel`] for a description.
channel_size: usize,
channel_size: AtomicUsize,
}

impl Clone for PubSubFrontend {
fn clone(&self) -> Self {
let channel_size = self.channel_size.load(Ordering::Relaxed);
Self { tx: self.tx.clone(), channel_size: AtomicUsize::new(channel_size) }
}
}

impl PubSubFrontend {
/// Create a new frontend.
pub(crate) const fn new(tx: mpsc::UnboundedSender<PubSubInstruction>) -> Self {
Self { tx, channel_size: 16 }
Self { tx, channel_size: AtomicUsize::new(16) }
}

/// Get the subscription ID for a local ID.
Expand Down Expand Up @@ -55,7 +63,7 @@ impl PubSubFrontend {
req: SerializedRequest,
) -> impl Future<Output = TransportResult<Response>> + Send + 'static {
let tx = self.tx.clone();
let channel_size = self.channel_size;
let channel_size = self.channel_size.load(Ordering::Relaxed);

async move {
let (in_flight, rx) = InFlight::new(req, channel_size);
Expand All @@ -81,17 +89,17 @@ impl PubSubFrontend {
/// to buffer in new subscription channels. Defaults to 16. See
/// [`tokio::sync::broadcast`] for a description of relevant
/// behavior.
pub const fn channel_size(&self) -> usize {
self.channel_size
pub fn channel_size(&self) -> usize {
self.channel_size.load(Ordering::Relaxed)
}

/// Set the channel size. This is the number of items to buffer in new
/// subscription channels. Defaults to 16. See
/// [`tokio::sync::broadcast`] for a description of relevant
/// behavior.
pub fn set_channel_size(&mut self, channel_size: usize) {
pub fn set_channel_size(&self, channel_size: usize) {
debug_assert_ne!(channel_size, 0, "channel size must be non-zero");
self.channel_size = channel_size;
self.channel_size.store(channel_size, Ordering::Relaxed);
}
}

Expand Down
5 changes: 5 additions & 0 deletions crates/rpc-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ mod pubsub_impl {
pub fn channel_size(&self) -> usize {
self.transport.channel_size()
}

/// Set the channel size.
pub fn set_channel_size(&self, size: usize) {
self.transport.set_channel_size(size)
}
}
}

Expand Down

0 comments on commit f2e14f7

Please sign in to comment.