diff --git a/substrate/client/rpc-spec-v2/src/transaction/api.rs b/substrate/client/rpc-spec-v2/src/transaction/api.rs index 33af9c953338..119bf270c63a 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/api.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/api.rs @@ -47,14 +47,14 @@ pub trait TransactionBroadcastApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "transaction_unstable_broadcast")] - fn broadcast(&self, bytes: Bytes) -> RpcResult>; + #[method(name = "transaction_unstable_broadcast", raw_method)] + async fn broadcast(&self, bytes: Bytes) -> RpcResult>; /// Broadcast an extrinsic to the chain. /// /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "transaction_unstable_stop")] - fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>; + #[method(name = "transaction_unstable_stop", raw_method)] + async fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>; } diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index 6eaf50d6b2e2..8af1882f39dd 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -18,11 +18,17 @@ //! API implementation for broadcasting transactions. -use crate::{transaction::api::TransactionBroadcastApiServer, SubscriptionTaskExecutor}; +use crate::{ + common::connections::RpcConnections, transaction::api::TransactionBroadcastApiServer, + SubscriptionTaskExecutor, +}; use codec::Decode; use futures::{FutureExt, Stream, StreamExt}; use futures_util::stream::AbortHandle; -use jsonrpsee::core::{async_trait, RpcResult}; +use jsonrpsee::{ + core::{async_trait, RpcResult}, + ConnectionDetails, +}; use parking_lot::RwLock; use rand::{distributions::Alphanumeric, Rng}; use sc_client_api::BlockchainEvents; @@ -46,6 +52,8 @@ pub struct TransactionBroadcast { executor: SubscriptionTaskExecutor, /// The broadcast operation IDs. broadcast_ids: Arc>>, + /// Keep track of how many concurrent operations are active for each connection. + rpc_connections: RpcConnections, } /// The state of a broadcast operation. @@ -57,7 +65,13 @@ struct BroadcastState { impl TransactionBroadcast { /// Creates a new [`TransactionBroadcast`]. pub fn new(client: Arc, pool: Arc, executor: SubscriptionTaskExecutor) -> Self { - TransactionBroadcast { client, pool, executor, broadcast_ids: Default::default() } + TransactionBroadcast { + client, + pool, + executor, + broadcast_ids: Default::default(), + rpc_connections: RpcConnections::new(4), + } } /// Generate an unique operation ID for the `transaction_broadcast` RPC method. @@ -100,15 +114,36 @@ where ::Hash: Unpin, Client: HeaderBackend + BlockchainEvents + Send + Sync + 'static, { - fn broadcast(&self, bytes: Bytes) -> RpcResult> { + async fn broadcast( + &self, + connection_details: ConnectionDetails, + bytes: Bytes, + ) -> RpcResult> { let pool = self.pool.clone(); // The unique ID of this operation. let id = self.generate_unique_id(); - let mut best_block_import_stream = + // Ensure that the connection has not reached the maximum number of active operations. + let Some(reserved_connection) = + self.rpc_connections.reserve_space(connection_details.id().clone()) + else { + return Ok(None) + }; + let Some(reserved_identifier) = reserved_connection.register(id.clone()) else { + // This can only happen if the generated operation ID is not unique. + return Ok(None) + }; + + // The compiler can no longer deduce the type of the stream and complains + // about `one type is more general than the other`. + let mut best_block_import_stream: std::pin::Pin< + Box::Hash> + Send>, + > = Box::pin(self.client.import_notification_stream().filter_map( - |notification| async move { notification.is_new_best.then_some(notification.hash) }, + |notification| async move { + notification.is_new_best.then_some(notification.hash.clone()) + }, )); let broadcast_transaction_fut = async move { @@ -172,6 +207,8 @@ where // The future expected by the executor must be `Future` instead of // `Future>`. let fut = fut.map(move |_| { + // Connection space is cleaned when this object is dropped. + let _reserved_identifier = reserved_identifier; // Remove the entry from the broadcast IDs map. broadcast_ids.write().remove(&drop_id); }); @@ -187,7 +224,16 @@ where Ok(Some(id)) } - fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast> { + async fn stop_broadcast( + &self, + connection_details: ConnectionDetails, + operation_id: String, + ) -> Result<(), ErrorBroadcast> { + // The operation ID must correlate to the same connectio ID. + if !self.rpc_connections.contains_identifier(connection_details.id(), &operation_id) { + return Err(ErrorBroadcast::InvalidOperationID) + } + let mut broadcast_ids = self.broadcast_ids.write(); let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else {