diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index d44006392dca..4ee4f69b3d72 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -19,6 +19,7 @@ //! API implementation for submitting transactions. use crate::{ + common::connections::RpcConnections, transaction::{ api::TransactionApiServer, error::Error, @@ -47,12 +48,14 @@ pub struct Transaction { pool: Arc, /// Executor to spawn subscriptions. executor: SubscriptionTaskExecutor, + /// Keep track of how many concurrent operations are active for each connection. + rpc_connections: RpcConnections, } impl Transaction { /// Creates a new [`Transaction`]. pub fn new(client: Arc, pool: Arc, executor: SubscriptionTaskExecutor) -> Self { - Transaction { client, pool, executor } + Transaction { client, pool, executor, rpc_connections: RpcConnections::new(4) } } } @@ -81,8 +84,20 @@ where fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) { let client = self.client.clone(); let pool = self.pool.clone(); + let rpc_connections = self.rpc_connections.clone(); let fut = async move { + let Some(_reserved_connection) = rpc_connections.reserve_space(pending.connection_id()) + else { + let err = ErrorObject::owned( + BAD_FORMAT, + format!("Reached maximum number of connections"), + None::<()>, + ); + let _ = pending.reject(err).await; + return + }; + // This is the only place where the RPC server can return an error for this // subscription. Other defects must be signaled as events to the sink. let decoded_extrinsic = match TransactionFor::::decode(&mut &xt[..]) { diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index 8af1882f39dd..e43ec5e60877 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -125,8 +125,7 @@ where let id = self.generate_unique_id(); // Ensure that the connection has not reached the maximum number of active operations. - let Some(reserved_connection) = - self.rpc_connections.reserve_space(connection_details.id().clone()) + let Some(reserved_connection) = self.rpc_connections.reserve_space(connection_details.id()) else { return Ok(None) };