Skip to content

Commit

Permalink
http client: add max_concurrent_requests (#1473)
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 authored Oct 16, 2024
1 parent 93722cb commit fa5b1ce
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
38 changes: 34 additions & 4 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{BoxError, JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::{ErrorObject, InvalidRequestId, ResponseSuccess, TwoPointZero};
use serde::de::DeserializeOwned;
use tokio::sync::Semaphore;
use tower::layer::util::Identity;
use tower::{Layer, Service};
use tracing::instrument;
Expand Down Expand Up @@ -78,14 +79,14 @@ pub struct HttpClientBuilder<L = Identity> {
max_request_size: u32,
max_response_size: u32,
request_timeout: Duration,
max_concurrent_requests: usize,
#[cfg(feature = "tls")]
certificate_store: CertificateStore,
id_kind: IdKind,
max_log_length: u32,
headers: HeaderMap,
service_builder: tower::ServiceBuilder<L>,
tcp_no_delay: bool,
max_concurrent_requests: Option<usize>,
}

impl<L> HttpClientBuilder<L> {
Expand All @@ -107,6 +108,12 @@ impl<L> HttpClientBuilder<L> {
self
}

/// Set the maximum number of concurrent requests. Default disabled.
pub fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
self.max_concurrent_requests = Some(max_concurrent_requests);
self
}

/// Force to use the rustls native certificate store.
///
/// Since multiple certificate stores can be optionally enabled, this option will
Expand Down Expand Up @@ -216,12 +223,12 @@ impl<L> HttpClientBuilder<L> {
id_kind: self.id_kind,
headers: self.headers,
max_log_length: self.max_log_length,
max_concurrent_requests: self.max_concurrent_requests,
max_request_size: self.max_request_size,
max_response_size: self.max_response_size,
service_builder,
request_timeout: self.request_timeout,
tcp_no_delay: self.tcp_no_delay,
max_concurrent_requests: self.max_concurrent_requests,
}
}
}
Expand Down Expand Up @@ -263,7 +270,16 @@ where
.build(target)
.map_err(|e| Error::Transport(e.into()))?;

Ok(HttpClient { transport, id_manager: Arc::new(RequestIdManager::new(id_kind)), request_timeout })
let request_guard = self
.max_concurrent_requests
.map(|max_concurrent_requests| Arc::new(Semaphore::new(max_concurrent_requests)));

Ok(HttpClient {
transport,
id_manager: Arc::new(RequestIdManager::new(id_kind)),
request_timeout,
request_guard,
})
}
}

Expand All @@ -273,14 +289,14 @@ impl Default for HttpClientBuilder<Identity> {
max_request_size: TEN_MB_SIZE_BYTES,
max_response_size: TEN_MB_SIZE_BYTES,
request_timeout: Duration::from_secs(60),
max_concurrent_requests: 256,
#[cfg(feature = "tls")]
certificate_store: CertificateStore::Native,
id_kind: IdKind::Number,
max_log_length: 4096,
headers: HeaderMap::new(),
service_builder: tower::ServiceBuilder::new(),
tcp_no_delay: true,
max_concurrent_requests: None,
}
}
}
Expand All @@ -301,6 +317,8 @@ pub struct HttpClient<S = HttpBackend> {
request_timeout: Duration,
/// Request ID manager.
id_manager: Arc<RequestIdManager>,
/// Concurrent requests limit guard.
request_guard: Option<Arc<Semaphore>>,
}

impl HttpClient<HttpBackend> {
Expand All @@ -324,6 +342,10 @@ where
where
Params: ToRpcParams + Send,
{
let _permit = match self.request_guard.as_ref() {
Some(permit) => permit.acquire().await.ok(),
None => None,
};
let params = params.to_rpc_params()?;
let notif =
serde_json::to_string(&NotificationSer::borrowed(&method, params.as_deref())).map_err(Error::ParseError)?;
Expand All @@ -343,6 +365,10 @@ where
R: DeserializeOwned,
Params: ToRpcParams + Send,
{
let _permit = match self.request_guard.as_ref() {
Some(permit) => permit.acquire().await.ok(),
None => None,
};
let id = self.id_manager.next_request_id();
let params = params.to_rpc_params()?;

Expand Down Expand Up @@ -378,6 +404,10 @@ where
where
R: DeserializeOwned + fmt::Debug + 'a,
{
let _permit = match self.request_guard.as_ref() {
Some(permit) => permit.acquire().await.ok(),
None => None,
};
let batch = batch.build()?;
let id = self.id_manager.next_request_id();
let id_range = generate_batch_id_range(id, batch.len() as u64)?;
Expand Down
2 changes: 0 additions & 2 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,6 @@ impl RequestIdManager {
}

/// Attempts to get the next request ID.
///
/// Fails if request limit has been exceeded.
pub fn next_request_id(&self) -> Id<'static> {
self.id_kind.into_id(self.current_id.next())
}
Expand Down

0 comments on commit fa5b1ce

Please sign in to comment.