Skip to content

Commit

Permalink
configure polling interval (alloy-rs#437)
Browse files Browse the repository at this point in the history
* feat(rpc-client): add set_poll_interval method

* fix: using with_poll_interval in RpcClient

* use AtomicU64 for poll interval

* PollerBuilder nits

* fix test

* store Ordering::Relaxed
  • Loading branch information
yash-atreya authored and ben186 committed Jul 27, 2024
1 parent a379a40 commit af801c9
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 12 deletions.
57 changes: 47 additions & 10 deletions crates/rpc-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{
atomic::{AtomicU64, Ordering},
Arc, Weak,
},
time::Duration,
};
use tower::{layer::util::Identity, ServiceBuilder};

Expand Down Expand Up @@ -81,6 +80,15 @@ impl<T> RpcClient<T> {
pub fn get_ref(&self) -> ClientRef<'_, T> {
&self.0
}

/// Sets the poll interval for the client in milliseconds.
///
/// Note: This will only set the poll interval for the client if it is the only reference to the
/// inner client. If the reference is held by many, then it will not update the poll interval.
pub fn with_poll_interval(self, poll_interval: u64) -> Self {
self.inner().set_poll_interval(poll_interval);
self
}
}

impl<T: Transport> RpcClient<T> {
Expand Down Expand Up @@ -152,22 +160,33 @@ pub struct RpcClientInner<T> {
pub(crate) is_local: bool,
/// The next request ID to use.
pub(crate) id: AtomicU64,
/// The poll interval for the client in milliseconds.
pub(crate) poll_interval: AtomicU64,
}

impl<T> RpcClientInner<T> {
/// Create a new [`RpcClient`] with the given transport.
///
/// Note: Sets the poll interval to 250ms for local transports and 7s for remote transports by
/// default.
#[inline]
pub const fn new(t: T, is_local: bool) -> Self {
Self { transport: t, is_local, id: AtomicU64::new(0) }
Self {
transport: t,
is_local,
id: AtomicU64::new(0),
poll_interval: if is_local { AtomicU64::new(250) } else { AtomicU64::new(7000) },
}
}

/// Returns the default poll interval for the client.
pub const fn default_poll_interval(&self) -> Duration {
if self.is_local {
Duration::from_millis(250)
} else {
Duration::from_secs(7)
}
/// Returns the default poll interval (milliseconds) for the client.
pub fn poll_interval(&self) -> u64 {
self.poll_interval.load(Ordering::Relaxed)
}

/// Set the poll interval for the client in milliseconds.
pub fn set_poll_interval(&self, poll_interval: u64) {
self.poll_interval.store(poll_interval, Ordering::Relaxed);
}

/// Returns a reference to the underlying transport.
Expand Down Expand Up @@ -262,7 +281,12 @@ impl<T: Transport + Clone> RpcClientInner<T> {
/// erasing each type. E.g. if you have `RpcClient<Http>` and
/// `RpcClient<Ws>` you can put both into a `Vec<RpcClient<BoxTransport>>`.
pub fn boxed(self) -> RpcClientInner<BoxTransport> {
RpcClientInner { transport: self.transport.boxed(), is_local: self.is_local, id: self.id }
RpcClientInner {
transport: self.transport.boxed(),
is_local: self.is_local,
id: self.id,
poll_interval: self.poll_interval,
}
}
}

Expand Down Expand Up @@ -307,3 +331,16 @@ mod pubsub_impl {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_client_with_poll_interval() {
let client = RpcClient::new_http(reqwest::Url::parse("http://localhost").unwrap())
.with_poll_interval(5000);
// let client = client;
assert_eq!(client.poll_interval(), 5000);
}
}
5 changes: 3 additions & 2 deletions crates/rpc-client/src/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ where
method: impl Into<Cow<'static, str>>,
params: Params,
) -> Self {
let poll_interval =
client.upgrade().map_or_else(|| Duration::from_secs(7), |c| c.default_poll_interval());
let poll_interval = client
.upgrade()
.map_or_else(|| Duration::from_secs(7), |c| Duration::from_millis(c.poll_interval()));
Self {
client,
method: method.into(),
Expand Down

0 comments on commit af801c9

Please sign in to comment.