Skip to content

Commit

Permalink
tx: Ensure limits for number of ongoing submitAndWatch
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv committed Apr 2, 2024
1 parent 6222f3d commit c447ac2
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
17 changes: 16 additions & 1 deletion substrate/client/rpc-spec-v2/src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! API implementation for submitting transactions.

use crate::{
common::connections::RpcConnections,
transaction::{
api::TransactionApiServer,
error::Error,
Expand Down Expand Up @@ -47,12 +48,14 @@ pub struct Transaction<Pool, Client> {
pool: Arc<Pool>,
/// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor,
/// Keep track of how many concurrent operations are active for each connection.
rpc_connections: RpcConnections,
}

impl<Pool, Client> Transaction<Pool, Client> {
/// Creates a new [`Transaction`].
pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self {
Transaction { client, pool, executor }
Transaction { client, pool, executor, rpc_connections: RpcConnections::new(4) }
}
}

Expand Down Expand Up @@ -81,8 +84,20 @@ where
fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) {
let client = self.client.clone();
let pool = self.pool.clone();
let rpc_connections = self.rpc_connections.clone();

let fut = async move {
let Some(_reserved_connection) = rpc_connections.reserve_space(pending.connection_id())
else {
let err = ErrorObject::owned(
BAD_FORMAT,
format!("Reached maximum number of connections"),
None::<()>,
);
let _ = pending.reject(err).await;
return
};

// This is the only place where the RPC server can return an error for this
// subscription. Other defects must be signaled as events to the sink.
let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ where
let id = self.generate_unique_id();

// Ensure that the connection has not reached the maximum number of active operations.
let Some(reserved_connection) =
self.rpc_connections.reserve_space(connection_details.id().clone())
let Some(reserved_connection) = self.rpc_connections.reserve_space(connection_details.id())
else {
return Ok(None)
};
Expand Down

0 comments on commit c447ac2

Please sign in to comment.