Skip to content

Commit

Permalink
txBroadcast: Ensure limits and same connection
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv committed Apr 2, 2024
1 parent 8e95a3e commit 6222f3d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 11 deletions.
8 changes: 4 additions & 4 deletions substrate/client/rpc-spec-v2/src/transaction/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ pub trait TransactionBroadcastApi {
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "transaction_unstable_broadcast")]
fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>>;
#[method(name = "transaction_unstable_broadcast", raw_method)]
async fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>>;

/// Broadcast an extrinsic to the chain.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "transaction_unstable_stop")]
fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>;
#[method(name = "transaction_unstable_stop", raw_method)]
async fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@

//! API implementation for broadcasting transactions.

use crate::{transaction::api::TransactionBroadcastApiServer, SubscriptionTaskExecutor};
use crate::{
common::connections::RpcConnections, transaction::api::TransactionBroadcastApiServer,
SubscriptionTaskExecutor,
};
use codec::Decode;
use futures::{FutureExt, Stream, StreamExt};
use futures_util::stream::AbortHandle;
use jsonrpsee::core::{async_trait, RpcResult};
use jsonrpsee::{
core::{async_trait, RpcResult},
ConnectionDetails,
};
use parking_lot::RwLock;
use rand::{distributions::Alphanumeric, Rng};
use sc_client_api::BlockchainEvents;
Expand All @@ -46,6 +52,8 @@ pub struct TransactionBroadcast<Pool, Client> {
executor: SubscriptionTaskExecutor,
/// The broadcast operation IDs.
broadcast_ids: Arc<RwLock<HashMap<String, BroadcastState>>>,
/// Keep track of how many concurrent operations are active for each connection.
rpc_connections: RpcConnections,
}

/// The state of a broadcast operation.
Expand All @@ -57,7 +65,13 @@ struct BroadcastState {
impl<Pool, Client> TransactionBroadcast<Pool, Client> {
/// Creates a new [`TransactionBroadcast`].
pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self {
TransactionBroadcast { client, pool, executor, broadcast_ids: Default::default() }
TransactionBroadcast {
client,
pool,
executor,
broadcast_ids: Default::default(),
rpc_connections: RpcConnections::new(4),
}
}

/// Generate an unique operation ID for the `transaction_broadcast` RPC method.
Expand Down Expand Up @@ -100,15 +114,36 @@ where
<Pool::Block as BlockT>::Hash: Unpin,
Client: HeaderBackend<Pool::Block> + BlockchainEvents<Pool::Block> + Send + Sync + 'static,
{
fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>> {
async fn broadcast(
&self,
connection_details: ConnectionDetails,
bytes: Bytes,
) -> RpcResult<Option<String>> {
let pool = self.pool.clone();

// The unique ID of this operation.
let id = self.generate_unique_id();

let mut best_block_import_stream =
// Ensure that the connection has not reached the maximum number of active operations.
let Some(reserved_connection) =
self.rpc_connections.reserve_space(connection_details.id().clone())
else {
return Ok(None)
};
let Some(reserved_identifier) = reserved_connection.register(id.clone()) else {
// This can only happen if the generated operation ID is not unique.
return Ok(None)
};

// The compiler can no longer deduce the type of the stream and complains
// about `one type is more general than the other`.
let mut best_block_import_stream: std::pin::Pin<
Box<dyn Stream<Item = <Pool::Block as BlockT>::Hash> + Send>,
> =
Box::pin(self.client.import_notification_stream().filter_map(
|notification| async move { notification.is_new_best.then_some(notification.hash) },
|notification| async move {
notification.is_new_best.then_some(notification.hash.clone())
},
));

let broadcast_transaction_fut = async move {
Expand Down Expand Up @@ -172,6 +207,8 @@ where
// The future expected by the executor must be `Future<Output = ()>` instead of
// `Future<Output = Result<(), Aborted>>`.
let fut = fut.map(move |_| {
// Connection space is cleaned when this object is dropped.
let _reserved_identifier = reserved_identifier;
// Remove the entry from the broadcast IDs map.
broadcast_ids.write().remove(&drop_id);
});
Expand All @@ -187,7 +224,16 @@ where
Ok(Some(id))
}

fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast> {
async fn stop_broadcast(
&self,
connection_details: ConnectionDetails,
operation_id: String,
) -> Result<(), ErrorBroadcast> {
// The operation ID must correlate to the same connectio ID.
if !self.rpc_connections.contains_identifier(connection_details.id(), &operation_id) {
return Err(ErrorBroadcast::InvalidOperationID)
}

let mut broadcast_ids = self.broadcast_ids.write();

let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else {
Expand Down

0 comments on commit 6222f3d

Please sign in to comment.