diff --git a/CHANGELOG.md b/CHANGELOG.md index e674840e48a..48543746b2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Added +- [2131](https://github.com/FuelLabs/fuel-core/pull/2131): Add flow in TxPool in order to ask to newly connected peers to share their transaction pool + ### Changed #### Breaking diff --git a/Cargo.lock b/Cargo.lock index 64bfe865ebf..62e050c6d9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3722,11 +3722,13 @@ dependencies = [ "fuel-core-trace", "fuel-core-txpool", "fuel-core-types 0.36.0", + "futures", "itertools 0.12.1", "mockall", "num-rational", "parking_lot", "proptest", + "rayon", "rstest", "test-strategy", "tokio", diff --git a/bin/fuel-core/src/cli/run/p2p.rs b/bin/fuel-core/src/cli/run/p2p.rs index 47b107f8312..1e91f80589a 100644 --- a/bin/fuel-core/src/cli/run/p2p.rs +++ b/bin/fuel-core/src/cli/run/p2p.rs @@ -66,6 +66,10 @@ pub struct P2PArgs { #[clap(long = "max-headers-per-request", default_value = "100", env)] pub max_headers_per_request: usize, + /// Max number of txs in a single txs request response + #[clap(long = "max-txs-per-request", default_value = "10000", env)] + pub max_txs_per_request: usize, + /// Addresses of the bootstrap nodes /// They should contain PeerId within their `Multiaddr` #[clap(long = "bootstrap-nodes", value_delimiter = ',', env)] @@ -304,6 +308,7 @@ impl P2PArgs { tcp_port: self.peering_port, max_block_size: self.max_block_size, max_headers_per_request: self.max_headers_per_request, + max_txs_per_request: self.max_txs_per_request, bootstrap_nodes: self.bootstrap_nodes, reserved_nodes: self.reserved_nodes, reserved_nodes_only_mode: self.reserved_nodes_only_mode, diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 4249a1d0add..1ce471381ce 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -76,7 +76,7 @@ use crate::state::{ }; #[cfg(feature = "rocksdb")] use std::path::Path; -use fuel_core_gas_price_service::fuel_gas_price_updater::fuel_core_storage_adapter::storage::{ GasPriceMetadata}; +use fuel_core_gas_price_service::fuel_gas_price_updater::fuel_core_storage_adapter::storage::GasPriceMetadata; use crate::database::database_description::gas_price::GasPriceDatabase; // Storages implementation diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index 364ea3976bb..f72b4c348b0 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -21,6 +21,10 @@ use fuel_core_p2p::{ codecs::postcard::PostcardCodec, network_service::FuelP2PService, p2p_service::FuelP2PEvent, + request_response::messages::{ + RequestMessage, + ResponseMessage, + }, service::to_message_acceptance, }; use fuel_core_poa::{ @@ -153,16 +157,30 @@ impl Bootstrap { } event = bootstrap.next_event() => { // The bootstrap node only forwards data without validating it. - if let Some(FuelP2PEvent::GossipsubMessage { - peer_id, - message_id, - .. - }) = event { - bootstrap.report_message_validation_result( - &message_id, + match event { + Some(FuelP2PEvent::GossipsubMessage { peer_id, - to_message_acceptance(&GossipsubMessageAcceptance::Accept) - ) + message_id, + .. + }) => { + bootstrap.report_message_validation_result( + &message_id, + peer_id, + to_message_acceptance(&GossipsubMessageAcceptance::Accept) + ) + }, + Some(FuelP2PEvent::InboundRequestMessage { + request_id, + request_message + }) => { + if request_message == RequestMessage::TxPoolAllTransactionsIds { + let _ = bootstrap.send_response_msg( + request_id, + ResponseMessage::TxPoolAllTransactionsIds(Some(vec![])), + ); + } + } + _ => {} } } } diff --git a/crates/fuel-core/src/service/adapters/p2p.rs b/crates/fuel-core/src/service/adapters/p2p.rs index d87bd391b03..db3545c095b 100644 --- a/crates/fuel-core/src/service/adapters/p2p.rs +++ b/crates/fuel-core/src/service/adapters/p2p.rs @@ -1,18 +1,26 @@ -use super::BlockImporterAdapter; +use super::{ + BlockImporterAdapter, + TxPoolAdapter, +}; use crate::database::OnChainIterableKeyValueView; use fuel_core_p2p::ports::{ BlockHeightImporter, P2pDb, + TxPool, }; use fuel_core_services::stream::BoxStream; use fuel_core_storage::Result as StorageResult; +use fuel_core_txpool::types::TxId; use fuel_core_types::{ blockchain::{ consensus::Genesis, SealedBlockHeader, }, fuel_types::BlockHeight, - services::p2p::Transactions, + services::p2p::{ + NetworkableTransactionPool, + Transactions, + }, }; use std::ops::Range; @@ -49,3 +57,21 @@ impl BlockHeightImporter for BlockImporterAdapter { ) } } + +impl TxPool for TxPoolAdapter { + fn get_tx_ids(&self, max_txs: usize) -> Vec { + self.service.get_tx_ids(max_txs) + } + + fn get_full_txs(&self, tx_ids: Vec) -> Vec> { + self.service + .find(tx_ids) + .into_iter() + .map(|tx_info| { + tx_info.map(|tx| { + NetworkableTransactionPool::PoolTransaction(tx.tx().clone()) + }) + }) + .collect() + } +} diff --git a/crates/fuel-core/src/service/adapters/sync.rs b/crates/fuel-core/src/service/adapters/sync.rs index 8e7d775b619..cc4bb4dbbba 100644 --- a/crates/fuel-core/src/service/adapters/sync.rs +++ b/crates/fuel-core/src/service/adapters/sync.rs @@ -74,9 +74,7 @@ impl PeerToPeerPort for P2PAdapter { data: range, } = range; if let Some(service) = &self.service { - service - .get_transactions_from_peer(peer_id.into(), range) - .await + service.get_transactions_from_peer(peer_id, range).await } else { Err(anyhow::anyhow!("No P2P service available")) } diff --git a/crates/fuel-core/src/service/adapters/txpool.rs b/crates/fuel-core/src/service/adapters/txpool.rs index 27d393c069a..97ff57fb200 100644 --- a/crates/fuel-core/src/service/adapters/txpool.rs +++ b/crates/fuel-core/src/service/adapters/txpool.rs @@ -28,6 +28,7 @@ use fuel_core_txpool::{ GasPriceProvider, MemoryPool, }, + types::TxId, Result as TxPoolResult, }; use fuel_core_types::{ @@ -52,6 +53,7 @@ use fuel_core_types::{ p2p::{ GossipsubMessageAcceptance, GossipsubMessageInfo, + PeerId, TransactionGossipData, }, }, @@ -65,6 +67,7 @@ impl BlockImporter for BlockImporterAdapter { } #[cfg(feature = "p2p")] +#[async_trait::async_trait] impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { type GossipedTransaction = TransactionGossipData; @@ -91,6 +94,21 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { } } + fn subscribe_new_peers(&self) -> BoxStream { + use tokio_stream::{ + wrappers::BroadcastStream, + StreamExt, + }; + if let Some(service) = &self.service { + Box::pin( + BroadcastStream::new(service.subscribe_new_peers()) + .filter_map(|result| result.ok()), + ) + } else { + Box::pin(fuel_core_services::stream::pending()) + } + } + fn notify_gossip_transaction_validity( &self, message_info: GossipsubMessageInfo, @@ -102,9 +120,32 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { Ok(()) } } + + async fn request_tx_ids(&self, peer_id: PeerId) -> anyhow::Result> { + if let Some(service) = &self.service { + service.get_all_transactions_ids_from_peer(peer_id).await + } else { + Ok(vec![]) + } + } + + async fn request_txs( + &self, + peer_id: PeerId, + tx_ids: Vec, + ) -> anyhow::Result>> { + if let Some(service) = &self.service { + service + .get_full_transactions_from_peer(peer_id, tx_ids) + .await + } else { + Ok(vec![]) + } + } } #[cfg(not(feature = "p2p"))] +#[async_trait::async_trait] impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { type GossipedTransaction = TransactionGossipData; @@ -126,6 +167,22 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { ) -> anyhow::Result<()> { Ok(()) } + + fn subscribe_new_peers(&self) -> BoxStream { + Box::pin(fuel_core_services::stream::pending()) + } + + async fn request_tx_ids(&self, _peer_id: PeerId) -> anyhow::Result> { + Ok(vec![]) + } + + async fn request_txs( + &self, + _peer_id: PeerId, + _tx_ids: Vec, + ) -> anyhow::Result>> { + Ok(vec![]) + } } impl fuel_core_txpool::ports::TxPoolDb for OnChainIterableKeyValueView { diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 5cc519a79cc..5c4aaf05e23 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -75,7 +75,7 @@ pub type PoAService = fuel_core_poa::Service< SystemTime, >; #[cfg(feature = "p2p")] -pub type P2PService = fuel_core_p2p::service::Service; +pub type P2PService = fuel_core_p2p::service::Service; pub type TxPoolSharedState = fuel_core_txpool::service::SharedState< P2PAdapter, Database, @@ -169,14 +169,10 @@ pub fn init_sub_services( }; #[cfg(feature = "p2p")] - let mut network = config.p2p.clone().map(|p2p_config| { - fuel_core_p2p::service::new_service( - chain_id, - p2p_config, - database.on_chain().clone(), - importer_adapter.clone(), - ) - }); + let p2p_externals = config + .p2p + .clone() + .map(fuel_core_p2p::service::build_shared_state); #[cfg(feature = "p2p")] let p2p_adapter = { @@ -192,7 +188,7 @@ pub fn init_sub_services( invalid_transactions: -100., }; P2PAdapter::new( - network.as_ref().map(|network| network.shared.clone()), + p2p_externals.as_ref().map(|ext| ext.0.clone()), peer_report_config, ) }; @@ -229,6 +225,21 @@ pub fn init_sub_services( ); let tx_pool_adapter = TxPoolAdapter::new(txpool.shared.clone()); + #[cfg(feature = "p2p")] + let mut network = config.p2p.clone().zip(p2p_externals).map( + |(p2p_config, (shared_state, request_receiver))| { + fuel_core_p2p::service::new_service( + chain_id, + p2p_config, + shared_state, + request_receiver, + database.on_chain().clone(), + importer_adapter.clone(), + tx_pool_adapter.clone(), + ) + }, + ); + let block_producer = fuel_core_producer::Producer { config: config.block_producer.clone(), view_provider: database.on_chain().clone(), diff --git a/crates/services/p2p/src/config.rs b/crates/services/p2p/src/config.rs index a807f79be33..3119d8eb2e9 100644 --- a/crates/services/p2p/src/config.rs +++ b/crates/services/p2p/src/config.rs @@ -49,6 +49,9 @@ pub const MAX_RESPONSE_SIZE: usize = 18 * 1024 * 1024; /// Maximum number of blocks per request. pub const MAX_HEADERS_PER_REQUEST: usize = 100; +/// Maximum number of transactions ids asked per request. +pub const MAX_TXS_PER_REQUEST: usize = 10000; + #[derive(Clone, Debug)] pub struct Config { /// The keypair used for handshake during communication with other p2p nodes. @@ -73,6 +76,9 @@ pub struct Config { pub max_block_size: usize, pub max_headers_per_request: usize, + // Maximum of txs id asked in a single request + pub max_txs_per_request: usize, + // `DiscoveryBehaviour` related fields pub bootstrap_nodes: Vec, pub enable_mdns: bool, @@ -151,6 +157,7 @@ impl Config { tcp_port: self.tcp_port, max_block_size: self.max_block_size, max_headers_per_request: self.max_headers_per_request, + max_txs_per_request: self.max_txs_per_request, bootstrap_nodes: self.bootstrap_nodes, enable_mdns: self.enable_mdns, max_peers_connected: self.max_peers_connected, @@ -200,6 +207,7 @@ impl Config { tcp_port: 0, max_block_size: MAX_RESPONSE_SIZE, max_headers_per_request: MAX_HEADERS_PER_REQUEST, + max_txs_per_request: MAX_TXS_PER_REQUEST, bootstrap_nodes: vec![], enable_mdns: false, max_peers_connected: 50, diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index a01c17b4499..eb5a5a75a6c 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -642,6 +642,30 @@ impl FuelP2PService { c.send((peer, Err(ResponseError::TypeMismatch))).is_ok() } }, + ResponseSender::TxPoolAllTransactionsIds(c) => match response { + ResponseMessage::TxPoolAllTransactionsIds(v) => { + c.send((peer, Ok(v))).is_ok() + } + _ => { + warn!( + "Invalid response type received for request {:?}", + request_id + ); + c.send((peer, Err(ResponseError::TypeMismatch))).is_ok() + } + }, + ResponseSender::TxPoolFullTransactions(c) => match response { + ResponseMessage::TxPoolFullTransactions(v) => { + c.send((peer, Ok(v))).is_ok() + } + _ => { + warn!( + "Invalid response type received for request {:?}", + request_id + ); + c.send((peer, Err(ResponseError::TypeMismatch))).is_ok() + } + }, }; if !send_ok { @@ -674,6 +698,12 @@ impl FuelP2PService { ResponseSender::Transactions(c) => { let _ = c.send((peer, Err(ResponseError::P2P(error)))); } + ResponseSender::TxPoolAllTransactionsIds(c) => { + let _ = c.send((peer, Err(ResponseError::P2P(error)))); + } + ResponseSender::TxPoolFullTransactions(c) => { + let _ = c.send((peer, Err(ResponseError::P2P(error)))); + } }; } } @@ -779,9 +809,13 @@ mod tests { fuel_tx::{ Transaction, TransactionBuilder, + TxId, + UniqueIdentifier, }, + fuel_types::ChainId, services::p2p::{ GossipsubMessageAcceptance, + NetworkableTransactionPool, Transactions, }, }; @@ -1635,6 +1669,46 @@ mod tests { } }); } + RequestMessage::TxPoolAllTransactionsIds => { + let (tx_orchestrator, rx_orchestrator) = oneshot::channel(); + assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseSender::TxPoolAllTransactionsIds(tx_orchestrator)).is_ok()); + let tx_test_end = tx_test_end.clone(); + tokio::spawn(async move { + let response_message = rx_orchestrator.await; + + if let Ok((_, Ok(Some(transaction_ids)))) = response_message { + let tx_ids: Vec = (0..5).map(|_| Transaction::default_test_tx().id(&ChainId::new(1))).collect(); + let check = transaction_ids.len() == 5 && transaction_ids.iter().zip(tx_ids.iter()).all(|(a, b)| a == b); + let _ = tx_test_end.send(check).await; + } else { + tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); + let _ = tx_test_end.send(false).await; + } + }); + } + RequestMessage::TxPoolFullTransactions(tx_ids) => { + let (tx_orchestrator, rx_orchestrator) = oneshot::channel(); + assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseSender::TxPoolFullTransactions(tx_orchestrator)).is_ok()); + let tx_test_end = tx_test_end.clone(); + tokio::spawn(async move { + let response_message = rx_orchestrator.await; + + if let Ok((_, Ok(Some(transactions)))) = response_message { + let txs: Vec> = tx_ids.iter().enumerate().map(|(i, _)| { + if i == 0 { + None + } else { + Some(NetworkableTransactionPool::Transaction(Transaction::default_test_tx())) + } + }).collect(); + let check = transactions.len() == tx_ids.len() && transactions.iter().zip(txs.iter()).all(|(a, b)| a == b); + let _ = tx_test_end.send(check).await; + } else { + tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); + let _ = tx_test_end.send(false).await; + } + }); + } } } } @@ -1656,6 +1730,20 @@ mod tests { let transactions = vec![Transactions(txs)]; let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Some(transactions))); } + RequestMessage::TxPoolAllTransactionsIds => { + let tx_ids = (0..5).map(|_| Transaction::default_test_tx().id(&ChainId::new(1))).collect(); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolAllTransactionsIds(Some(tx_ids))); + } + RequestMessage::TxPoolFullTransactions(tx_ids) => { + let txs = tx_ids.iter().enumerate().map(|(i, _)| { + if i == 0 { + None + } else { + Some(NetworkableTransactionPool::Transaction(Transaction::default_test_tx())) + } + }).collect(); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolFullTransactions(Some(txs))); + } } } @@ -1679,6 +1767,21 @@ mod tests { request_response_works_with(RequestMessage::SealedHeaders(arbitrary_range)).await } + #[tokio::test] + #[instrument] + async fn request_response_works_with_transactions_ids() { + request_response_works_with(RequestMessage::TxPoolAllTransactionsIds).await + } + + #[tokio::test] + #[instrument] + async fn request_response_works_with_full_transactions() { + let tx_ids = (0..10) + .map(|_| Transaction::default_test_tx().id(&ChainId::new(1))) + .collect(); + request_response_works_with(RequestMessage::TxPoolFullTransactions(tx_ids)).await + } + /// We send a request for transactions, but it's responded by only headers #[tokio::test] #[instrument] diff --git a/crates/services/p2p/src/ports.rs b/crates/services/p2p/src/ports.rs index 91d352b3805..9bf68d3ed09 100644 --- a/crates/services/p2p/src/ports.rs +++ b/crates/services/p2p/src/ports.rs @@ -5,8 +5,12 @@ use fuel_core_types::{ consensus::Genesis, SealedBlockHeader, }, + fuel_tx::TxId, fuel_types::BlockHeight, - services::p2p::Transactions, + services::p2p::{ + NetworkableTransactionPool, + Transactions, + }, }; use std::ops::Range; @@ -28,3 +32,11 @@ pub trait BlockHeightImporter: Send + Sync { /// Creates a stream of next block heights fn next_block_height(&self) -> BoxStream; } + +pub trait TxPool: Send + Sync + Clone { + /// Get all tx ids in the pool + fn get_tx_ids(&self, max_ids: usize) -> Vec; + + /// Get full txs from the pool + fn get_full_txs(&self, tx_ids: Vec) -> Vec>; +} diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 517156c3642..83f3f7a3a50 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -1,6 +1,10 @@ use fuel_core_types::{ blockchain::SealedBlockHeader, - services::p2p::Transactions, + fuel_tx::TxId, + services::p2p::{ + NetworkableTransactionPool, + Transactions, + }, }; use libp2p::{ request_response::OutboundFailure, @@ -24,12 +28,16 @@ pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::( pub enum RequestMessage { SealedHeaders(Range), Transactions(Range), + TxPoolAllTransactionsIds, + TxPoolFullTransactions(Vec), } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ResponseMessage { SealedHeaders(Option>), Transactions(Option>), + TxPoolAllTransactionsIds(Option>), + TxPoolFullTransactions(Option>>), } pub type OnResponse = oneshot::Sender<(PeerId, Result)>; @@ -38,6 +46,8 @@ pub type OnResponse = oneshot::Sender<(PeerId, Result)>; pub enum ResponseSender { SealedHeaders(OnResponse>>), Transactions(OnResponse>>), + TxPoolAllTransactionsIds(OnResponse>>), + TxPoolFullTransactions(OnResponse>>>), } #[derive(Debug, Error)] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 584de8ac13b..b6e6187173d 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -5,6 +5,7 @@ use crate::{ NotInitialized, }, gossipsub::messages::{ + GossipTopicTag, GossipsubBroadcastRequest, GossipsubMessage, }, @@ -17,6 +18,7 @@ use crate::{ ports::{ BlockHeightImporter, P2pDb, + TxPool, }, request_response::messages::{ OnResponse, @@ -40,6 +42,7 @@ use fuel_core_types::{ blockchain::SealedBlockHeader, fuel_tx::{ Transaction, + TxId, UniqueIdentifier, }, fuel_types::{ @@ -55,6 +58,7 @@ use fuel_core_types::{ GossipData, GossipsubMessageAcceptance, GossipsubMessageInfo, + NetworkableTransactionPool, PeerId as FuelPeerId, TransactionGossipData, Transactions, @@ -77,7 +81,10 @@ use std::{ use tokio::{ sync::{ broadcast, - mpsc, + mpsc::{ + self, + Receiver, + }, oneshot, }, time::{ @@ -87,9 +94,11 @@ use tokio::{ }; use tracing::warn; -pub type Service = ServiceRunner>; +const CHANNEL_SIZE: usize = 1024 * 10; + +pub type Service = ServiceRunner>; -enum TaskRequest { +pub enum TaskRequest { // Broadcast requests to p2p network BroadcastTransaction(Arc), // Request to get information about all connected peers @@ -105,6 +114,15 @@ enum TaskRequest { from_peer: PeerId, channel: OnResponse>>, }, + TxPoolGetAllTxIds { + from_peer: PeerId, + channel: OnResponse>>, + }, + TxPoolGetFullTransactions { + tx_ids: Vec, + from_peer: PeerId, + channel: OnResponse>>>, + }, // Responds back to the p2p network RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)), RespondWithPeerReport { @@ -120,6 +138,14 @@ enum TaskRequest { response: Option>, request_id: InboundRequestId, }, + TxPoolAllTransactionsIds { + response: Option>, + request_id: InboundRequestId, + }, + TxPoolFullTransactions { + response: Option>>, + request_id: InboundRequestId, + }, } impl Debug for TaskRequest { @@ -134,6 +160,12 @@ impl Debug for TaskRequest { TaskRequest::GetTransactions { .. } => { write!(f, "TaskRequest::GetTransactions") } + TaskRequest::TxPoolGetAllTxIds { .. } => { + write!(f, "TaskRequest::TxPoolGetAllTxIds") + } + TaskRequest::TxPoolGetFullTransactions { .. } => { + write!(f, "TaskRequest::TxPoolGetFullTransactions") + } TaskRequest::RespondWithGossipsubMessageReport(_) => { write!(f, "TaskRequest::RespondWithGossipsubMessageReport") } @@ -149,6 +181,12 @@ impl Debug for TaskRequest { TaskRequest::DatabaseHeaderLookUp { .. } => { write!(f, "TaskRequest::DatabaseHeaderLookUp") } + TaskRequest::TxPoolAllTransactionsIds { .. } => { + write!(f, "TaskRequest::TxPoolAllTransactionsIds") + } + TaskRequest::TxPoolFullTransactions { .. } => { + write!(f, "TaskRequest::TxPoolFullTransactions") + } } } } @@ -289,6 +327,8 @@ pub trait Broadcast: Send { ) -> anyhow::Result<()>; fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()>; + + fn new_tx_subscription_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()>; } impl Broadcast for SharedState { @@ -313,22 +353,28 @@ impl Broadcast for SharedState { self.tx_broadcast.send(transaction)?; Ok(()) } + + fn new_tx_subscription_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()> { + self.new_tx_subscription_broadcast.send(peer_id)?; + Ok(()) + } } /// Uninitialized task for the p2p that can be upgraded later into [`Task`]. -pub struct UninitializedTask { +pub struct UninitializedTask { chain_id: ChainId, view_provider: V, next_block_height: BoxStream, /// Receive internal Task Requests request_receiver: mpsc::Receiver, broadcast: B, + tx_pool: T, config: Config, } /// Orchestrates various p2p-related events between the inner `P2pService` /// and the top level `NetworkService`. -pub struct Task { +pub struct Task { chain_id: ChainId, response_timeout: Duration, p2p_service: P, @@ -337,9 +383,11 @@ pub struct Task { /// Receive internal Task Requests request_receiver: mpsc::Receiver, request_sender: mpsc::Sender, - database_processor: HeavyTaskProcessor, + heavy_task_processor: HeavyTaskProcessor, broadcast: B, + tx_pool: T, max_headers_per_request: usize, + max_txs_per_request: usize, // milliseconds wait time between peer heartbeat reputation checks heartbeat_check_interval: Duration, heartbeat_max_avg_interval: Duration, @@ -354,43 +402,31 @@ pub struct HeartbeatPeerReputationConfig { low_heartbeat_frequency_penalty: AppScore, } -impl UninitializedTask { +impl UninitializedTask { pub fn new( chain_id: ChainId, config: Config, + shared_state: SharedState, + request_receiver: Receiver, view_provider: V, block_importer: B, + tx_pool: T, ) -> Self { - let (request_sender, request_receiver) = mpsc::channel(1024 * 10); - let (tx_broadcast, _) = broadcast::channel(1024 * 10); - let (block_height_broadcast, _) = broadcast::channel(1024 * 10); - - let (reserved_peers_broadcast, _) = broadcast::channel::( - config - .reserved_nodes - .len() - .saturating_mul(2) - .saturating_add(1), - ); let next_block_height = block_importer.next_block_height(); Self { chain_id, view_provider, + tx_pool, next_block_height, request_receiver, - broadcast: SharedState { - request_sender, - tx_broadcast, - reserved_peers_broadcast, - block_height_broadcast, - }, + broadcast: shared_state, config, } } } -impl Task { +impl Task { fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { for (peer_id, peer_info) in self.p2p_service.get_all_peer_info() { if peer_info.heartbeat_data.duration_since_last_heartbeat() @@ -433,15 +469,16 @@ impl Task { } } -impl Task +impl Task where P: TaskP2PService + 'static, V: AtomicView + 'static, V::LatestView: P2pDb, + T: TxPool + 'static, { - fn update_metrics(&self, update_fn: T) + fn update_metrics(&self, update_fn: U) where - T: FnOnce(), + U: FnOnce(), { self.p2p_service.update_metrics(update_fn) } @@ -458,16 +495,23 @@ where RequestMessage::SealedHeaders(range) => { self.handle_sealed_headers_request(range, request_id) } + RequestMessage::TxPoolAllTransactionsIds => { + self.handle_all_transactions_ids_request(request_id) + } + RequestMessage::TxPoolFullTransactions(tx_ids) => { + self.handle_full_transactions_request(tx_ids, request_id) + } } } - fn handle_request( + fn handle_db_request( &mut self, range: Range, request_id: InboundRequestId, response_sender: ResponseSenderFn, db_lookup: DbLookUpFn, task_request: TaskRequestFn, + max_len: usize, ) -> anyhow::Result<()> where DbLookUpFn: @@ -479,10 +523,6 @@ where let instant = Instant::now(); let timeout = self.response_timeout; let response_channel = self.request_sender.clone(); - // For now, we only process requests that are smaller than the max_blocks_per_request - // If there are other types of data we send over p2p req/res protocol, then this needs - // to be generalized - let max_len = self.max_headers_per_request; let range_len = range.len(); self.update_metrics(|| set_blocks_requested(range_len)); @@ -502,7 +542,7 @@ where } let view = self.view_provider.latest_view()?; - let result = self.database_processor.spawn(move || { + let result = self.heavy_task_processor.spawn(move || { if instant.elapsed() > timeout { tracing::warn!("Request timed out"); return; @@ -529,7 +569,7 @@ where range: Range, request_id: InboundRequestId, ) -> anyhow::Result<()> { - self.handle_request( + self.handle_db_request( range, request_id, ResponseMessage::Transactions, @@ -538,6 +578,7 @@ where response, request_id, }, + self.max_headers_per_request, ) } @@ -546,7 +587,7 @@ where range: Range, request_id: InboundRequestId, ) -> anyhow::Result<()> { - self.handle_request( + self.handle_db_request( range, request_id, ResponseMessage::SealedHeaders, @@ -555,6 +596,88 @@ where response, request_id, }, + self.max_headers_per_request, + ) + } + + fn handle_txpool_request( + &mut self, + request_id: InboundRequestId, + txpool_function: TxPoolFn, + response_sender: ResponseSenderFn, + task_request: TaskRequestFn, + ) -> anyhow::Result<()> + where + ResponseSenderFn: Fn(Option) -> ResponseMessage + Send + 'static, + TaskRequestFn: Fn(Option, InboundRequestId) -> TaskRequest + Send + 'static, + TxPoolFn: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let instant = Instant::now(); + let timeout = self.response_timeout; + let response_channel = self.request_sender.clone(); + let result = self.heavy_task_processor.spawn(move || { + if instant.elapsed() > timeout { + tracing::warn!("Request timed out"); + return; + } + + let response = txpool_function(); + + // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 + let _ = response_channel + .try_send(task_request(Some(response), request_id)) + .trace_err("Failed to send response to the request channel"); + }); + + if result.is_err() { + let _ = self + .p2p_service + .send_response_msg(request_id, response_sender(None)); + } + + Ok(()) + } + + fn handle_all_transactions_ids_request( + &mut self, + request_id: InboundRequestId, + ) -> anyhow::Result<()> { + let tx_pool = self.tx_pool.clone(); + let max_txs = self.max_txs_per_request; + self.handle_txpool_request( + request_id, + move || tx_pool.get_tx_ids(max_txs), + ResponseMessage::TxPoolAllTransactionsIds, + |response, request_id| TaskRequest::TxPoolAllTransactionsIds { + response, + request_id, + }, + ) + } + + fn handle_full_transactions_request( + &mut self, + tx_ids: Vec, + request_id: InboundRequestId, + ) -> anyhow::Result<()> { + // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 + if tx_ids.len() > self.max_txs_per_request { + self.p2p_service.send_response_msg( + request_id, + ResponseMessage::TxPoolFullTransactions(None), + )?; + return Ok(()); + } + let tx_pool = self.tx_pool.clone(); + self.handle_txpool_request( + request_id, + move || tx_pool.get_full_txs(tx_ids), + ResponseMessage::TxPoolFullTransactions, + |response, request_id| TaskRequest::TxPoolFullTransactions { + response, + request_id, + }, ) } } @@ -565,15 +688,16 @@ fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result { } #[async_trait::async_trait] -impl RunnableService for UninitializedTask +impl RunnableService for UninitializedTask where V: AtomicView + 'static, V::LatestView: P2pDb, + T: TxPool + 'static, { const NAME: &'static str = "P2P"; type SharedData = SharedState; - type Task = Task; + type Task = Task; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { @@ -591,6 +715,7 @@ where next_block_height, request_receiver, broadcast, + tx_pool, config, } = self; @@ -600,6 +725,7 @@ where let Config { max_block_size, max_headers_per_request, + max_txs_per_request, heartbeat_check_interval, heartbeat_max_avg_interval, heartbeat_max_time_since_last, @@ -627,7 +753,7 @@ where "The heartbeat check interval should be small enough to do frequently", ); let number_of_threads = 2; - let database_processor = HeavyTaskProcessor::new(number_of_threads, 1024 * 10)?; + let heavy_task_processor = HeavyTaskProcessor::new(number_of_threads, 1024 * 10)?; let request_sender = broadcast.request_sender.clone(); let task = Task { @@ -639,8 +765,10 @@ where request_sender, next_block_height, broadcast, - database_processor, + tx_pool, + heavy_task_processor, max_headers_per_request, + max_txs_per_request, heartbeat_check_interval, heartbeat_max_avg_interval, heartbeat_max_time_since_last, @@ -653,12 +781,13 @@ where // TODO: Add tests https://github.com/FuelLabs/fuel-core/issues/1275 #[async_trait::async_trait] -impl RunnableTask for Task +impl RunnableTask for Task where P: TaskP2PService + 'static, V: AtomicView + 'static, V::LatestView: P2pDb, B: Broadcast + 'static, + T: TxPool + 'static, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { tracing::debug!("P2P task is running"); @@ -706,6 +835,16 @@ where let request_msg = RequestMessage::Transactions(block_height_range); self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always a peer here, so send has a target"); } + Some(TaskRequest::TxPoolGetAllTxIds { from_peer, channel }) => { + let channel = ResponseSender::TxPoolAllTransactionsIds(channel); + let request_msg = RequestMessage::TxPoolAllTransactionsIds; + self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always have a peer here, so send has a target"); + } + Some(TaskRequest::TxPoolGetFullTransactions { tx_ids, from_peer, channel }) => { + let channel = ResponseSender::TxPoolFullTransactions(channel); + let request_msg = RequestMessage::TxPoolFullTransactions(tx_ids); + self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always have a peer here, so send has a target"); + } Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => { // report_message(&mut self.p2p_service, message, acceptance); self.p2p_service.report_message(message, acceptance)?; @@ -726,6 +865,12 @@ where Some(TaskRequest::DatabaseHeaderLookUp { response, request_id }) => { let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::SealedHeaders(response)); } + Some(TaskRequest::TxPoolAllTransactionsIds { response, request_id }) => { + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::TxPoolAllTransactionsIds(response)); + } + Some(TaskRequest::TxPoolFullTransactions { response, request_id }) => { + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::TxPoolFullTransactions(response)); + } None => { tracing::error!("The P2P `Task` should be holder of the `Sender`"); should_continue = false; @@ -757,6 +902,11 @@ where Some(FuelP2PEvent::InboundRequestMessage { request_message, request_id }) => { self.process_request(request_message, request_id)? }, + Some(FuelP2PEvent::NewSubscription { peer_id, tag }) => { + if tag == GossipTopicTag::NewTx { + let _ = self.broadcast.new_tx_subscription_broadcast(FuelPeerId::from(peer_id.to_bytes())); + } + }, _ => (), } }, @@ -791,6 +941,8 @@ where #[derive(Clone)] pub struct SharedState { + /// Sender of p2p with peer gossip subscription (vec represent the peer_id) + new_tx_subscription_broadcast: broadcast::Sender, /// Sender of p2p transaction used for subscribing. tx_broadcast: broadcast::Sender, /// Sender of reserved peers connection updates. @@ -799,6 +951,8 @@ pub struct SharedState { request_sender: mpsc::Sender, /// Sender of p2p blopck height data block_height_broadcast: broadcast::Sender, + /// Max txs per request + max_txs_per_request: usize, } impl SharedState { @@ -842,11 +996,11 @@ impl SharedState { pub async fn get_transactions_from_peer( &self, - peer_id: Vec, + peer_id: FuelPeerId, range: Range, ) -> anyhow::Result>> { let (sender, receiver) = oneshot::channel(); - let from_peer = PeerId::from_bytes(&peer_id).expect("Valid PeerId"); + let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId"); let request = TaskRequest::GetTransactions { block_height_range: range, @@ -858,7 +1012,7 @@ impl SharedState { let (response_from_peer, response) = receiver.await.map_err(|e| anyhow!("{e}"))?; assert_eq!( - peer_id, + peer_id.as_ref(), response_from_peer.to_bytes(), "Bug: response from non-requested peer" ); @@ -866,6 +1020,77 @@ impl SharedState { response.map_err(|e| anyhow!("Invalid response from peer {e:?}")) } + pub async fn get_all_transactions_ids_from_peer( + &self, + peer_id: FuelPeerId, + ) -> anyhow::Result> { + let (sender, receiver) = oneshot::channel(); + let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId"); + let request = TaskRequest::TxPoolGetAllTxIds { + from_peer, + channel: sender, + }; + self.request_sender.try_send(request)?; + + let (response_from_peer, response) = + receiver.await.map_err(|e| anyhow!("{e}"))?; + + debug_assert_eq!( + peer_id.as_ref(), + response_from_peer.to_bytes(), + "Bug: response from non-requested peer" + ); + + let Some(txs) = + response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))? + else { + return Ok(vec![]); + }; + if txs.len() > self.max_txs_per_request { + return Err(anyhow!("Too many transactions requested: {}", txs.len())); + } + Ok(txs) + } + + pub async fn get_full_transactions_from_peer( + &self, + peer_id: FuelPeerId, + tx_ids: Vec, + ) -> anyhow::Result>> { + let (sender, receiver) = oneshot::channel(); + let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId"); + let request = TaskRequest::TxPoolGetFullTransactions { + tx_ids, + from_peer, + channel: sender, + }; + self.request_sender.try_send(request)?; + + let (response_from_peer, response) = + receiver.await.map_err(|e| anyhow!("{e}"))?; + debug_assert_eq!( + peer_id.as_ref(), + response_from_peer.to_bytes(), + "Bug: response from non-requested peer" + ); + + let Some(txs) = + response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))? + else { + return Ok(vec![]); + }; + if txs.len() > self.max_txs_per_request { + return Err(anyhow!("Too many transactions requested: {}", txs.len())); + } + txs.into_iter() + .map(|tx| { + tx.map(Transaction::try_from) + .transpose() + .map_err(|err| anyhow::anyhow!(err)) + }) + .collect() + } + pub fn broadcast_transaction( &self, transaction: Arc, @@ -885,6 +1110,10 @@ impl SharedState { receiver.await.map_err(|e| anyhow!("{}", e)) } + pub fn subscribe_new_peers(&self) -> broadcast::Receiver { + self.new_tx_subscription_broadcast.subscribe() + } + pub fn subscribe_tx(&self) -> broadcast::Receiver { self.tx_broadcast.subscribe() } @@ -926,19 +1155,59 @@ impl SharedState { } } -pub fn new_service( +pub fn build_shared_state( + config: Config, +) -> (SharedState, Receiver) { + let (request_sender, request_receiver) = mpsc::channel(CHANNEL_SIZE); + let (tx_broadcast, _) = broadcast::channel(CHANNEL_SIZE); + let (new_tx_subscription_broadcast, _) = broadcast::channel(CHANNEL_SIZE); + let (block_height_broadcast, _) = broadcast::channel(CHANNEL_SIZE); + + let (reserved_peers_broadcast, _) = broadcast::channel::( + config + .reserved_nodes + .len() + .saturating_mul(2) + .saturating_add(1), + ); + + ( + SharedState { + request_sender, + new_tx_subscription_broadcast, + tx_broadcast, + reserved_peers_broadcast, + block_height_broadcast, + max_txs_per_request: config.max_txs_per_request, + }, + request_receiver, + ) +} + +pub fn new_service( chain_id: ChainId, p2p_config: Config, + shared_state: SharedState, + request_receiver: Receiver, view_provider: V, block_importer: B, -) -> Service + tx_pool: T, +) -> Service where V: AtomicView + 'static, V::LatestView: P2pDb, B: BlockHeightImporter, + T: TxPool, { - let task = - UninitializedTask::new(chain_id, p2p_config, view_provider, block_importer); + let task = UninitializedTask::new( + chain_id, + p2p_config, + shared_state, + request_receiver, + view_provider, + block_importer, + tx_pool, + ); Service::new(task) } @@ -1036,11 +1305,35 @@ pub mod tests { } } + #[derive(Clone, Debug)] + struct FakeTxPool; + + impl TxPool for FakeTxPool { + fn get_tx_ids(&self, _max_txs: usize) -> Vec { + vec![] + } + + fn get_full_txs( + &self, + tx_ids: Vec, + ) -> Vec> { + tx_ids.iter().map(|_| None).collect() + } + } + #[tokio::test] async fn start_and_stop_awaits_works() { let p2p_config = Config::::default("start_stop_works"); - let service = - new_service(ChainId::default(), p2p_config, FakeDb, FakeBlockImporter); + let (shared_state, request_receiver) = build_shared_state(p2p_config.clone()); + let service = new_service( + ChainId::default(), + p2p_config, + shared_state, + request_receiver, + FakeDb, + FakeBlockImporter, + FakeTxPool, + ); // Node with p2p service started assert!(service.start_and_await().await.unwrap().started()); @@ -1182,6 +1475,13 @@ pub mod tests { ) -> anyhow::Result<()> { todo!() } + + fn new_tx_subscription_broadcast( + &self, + _peer_id: FuelPeerId, + ) -> anyhow::Result<()> { + todo!() + } } #[tokio::test] @@ -1235,11 +1535,13 @@ pub mod tests { p2p_service, view_provider: FakeDB, next_block_height: FakeBlockImporter.next_block_height(), + tx_pool: FakeTxPool, request_receiver, request_sender, - database_processor: HeavyTaskProcessor::new(1, 1).unwrap(), + heavy_task_processor: HeavyTaskProcessor::new(1, 1).unwrap(), broadcast, max_headers_per_request: 0, + max_txs_per_request: 100, heartbeat_check_interval: Duration::from_secs(0), heartbeat_max_avg_interval, heartbeat_max_time_since_last, @@ -1321,12 +1623,14 @@ pub mod tests { response_timeout: Default::default(), p2p_service, view_provider: FakeDB, + tx_pool: FakeTxPool, next_block_height: FakeBlockImporter.next_block_height(), request_receiver, request_sender, - database_processor: HeavyTaskProcessor::new(1, 1).unwrap(), + heavy_task_processor: HeavyTaskProcessor::new(1, 1).unwrap(), broadcast, max_headers_per_request: 0, + max_txs_per_request: 100, heartbeat_check_interval: Duration::from_secs(0), heartbeat_max_avg_interval, heartbeat_max_time_since_last, @@ -1379,13 +1683,15 @@ pub mod tests { chain_id: Default::default(), response_timeout: Default::default(), p2p_service, + tx_pool: FakeTxPool, view_provider: FakeDB, next_block_height, request_receiver, request_sender, - database_processor: HeavyTaskProcessor::new(1, 1).unwrap(), + heavy_task_processor: HeavyTaskProcessor::new(1, 1).unwrap(), broadcast, max_headers_per_request: 0, + max_txs_per_request: 100, heartbeat_check_interval: Duration::from_secs(0), heartbeat_max_avg_interval: Default::default(), heartbeat_max_time_since_last: Default::default(), diff --git a/crates/services/txpool/Cargo.toml b/crates/services/txpool/Cargo.toml index 22119a31268..073611afd07 100644 --- a/crates/services/txpool/Cargo.toml +++ b/crates/services/txpool/Cargo.toml @@ -18,9 +18,11 @@ fuel-core-metrics = { workspace = true } fuel-core-services = { workspace = true } fuel-core-storage = { workspace = true, features = ["std"] } fuel-core-types = { workspace = true, features = ["std"] } +futures = { workspace = true } mockall = { workspace = true, optional = true } num-rational = { workspace = true } parking_lot = { workspace = true } +rayon = { workspace = true } tokio = { workspace = true, default-features = false, features = ["sync"] } tokio-rayon = { workspace = true } tokio-stream = { workspace = true } diff --git a/crates/services/txpool/src/heavy_async_processing.rs b/crates/services/txpool/src/heavy_async_processing.rs new file mode 100644 index 00000000000..94fabad1fc7 --- /dev/null +++ b/crates/services/txpool/src/heavy_async_processing.rs @@ -0,0 +1,195 @@ +use std::{ + future::Future, + sync::Arc, +}; +use tokio::sync::Semaphore; + +pub struct HeavyAsyncProcessor { + rayon_thread_pool: rayon::ThreadPool, + semaphore: Arc, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct OutOfCapacity; + +impl HeavyAsyncProcessor { + pub fn new( + number_of_threads: usize, + number_of_pending_tasks: usize, + ) -> anyhow::Result { + let rayon_thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(number_of_threads) + .build() + .map_err(|e| anyhow::anyhow!("Failed to create a rayon pool: {}", e))?; + let semaphore = Arc::new(Semaphore::new(number_of_pending_tasks)); + Ok(Self { + rayon_thread_pool, + semaphore, + }) + } + + pub fn spawn(&self, future: F) -> Result<(), OutOfCapacity> + where + F: Future + Send + 'static, + { + let permit = self.semaphore.clone().try_acquire_owned(); + + if let Ok(permit) = permit { + self.rayon_thread_pool.spawn_fifo(move || { + let _drop = permit; + futures::executor::block_on(future); + }); + Ok(()) + } else { + Err(OutOfCapacity) + } + } +} + +#[cfg(test)] +#[allow(clippy::bool_assert_comparison)] +mod tests { + use super::*; + use std::{ + thread::sleep, + time::Duration, + }; + use tokio::time::Instant; + + #[tokio::test] + async fn one_spawn_single_tasks_works() { + // Given + let number_of_pending_tasks = 1; + let heavy_task_processor = + HeavyAsyncProcessor::new(1, number_of_pending_tasks).unwrap(); + + // When + let (sender, receiver) = tokio::sync::oneshot::channel(); + let result = heavy_task_processor.spawn(async move { + sender.send(()).unwrap(); + }); + + // Then + assert_eq!(result, Ok(())); + let timeout = tokio::time::timeout(Duration::from_secs(1), receiver).await; + timeout + .expect("Shouldn't timeout") + .expect("Should receive a message"); + } + + #[tokio::test] + async fn second_spawn_fails_when_limit_is_one_and_first_in_progress() { + // Given + let number_of_pending_tasks = 1; + let heavy_task_processor = + HeavyAsyncProcessor::new(1, number_of_pending_tasks).unwrap(); + let first_spawn_result = heavy_task_processor.spawn(async move { + sleep(Duration::from_secs(1)); + }); + assert_eq!(first_spawn_result, Ok(())); + + // When + let second_spawn_result = heavy_task_processor.spawn(async move { + sleep(Duration::from_secs(1)); + }); + + // Then + assert_eq!(second_spawn_result, Err(OutOfCapacity)); + } + + #[tokio::test] + async fn second_spawn_works_when_first_is_finished() { + let number_of_pending_tasks = 1; + let heavy_task_processor = + HeavyAsyncProcessor::new(1, number_of_pending_tasks).unwrap(); + + // Given + let (sender, receiver) = tokio::sync::oneshot::channel(); + let first_spawn = heavy_task_processor.spawn(async move { + sleep(Duration::from_secs(1)); + sender.send(()).unwrap(); + }); + assert_eq!(first_spawn, Ok(())); + receiver.await.unwrap(); + + // When + let second_spawn = heavy_task_processor.spawn(async move { + sleep(Duration::from_secs(1)); + }); + + // Then + assert_eq!(second_spawn, Ok(())); + } + + #[tokio::test] + async fn can_spawn_10_tasks_when_limit_is_10() { + // Given + let number_of_pending_tasks = 10; + let heavy_task_processor = + HeavyAsyncProcessor::new(1, number_of_pending_tasks).unwrap(); + + for _ in 0..number_of_pending_tasks { + // When + let result = heavy_task_processor.spawn(async move { + sleep(Duration::from_secs(1)); + }); + + // Then + assert_eq!(result, Ok(())); + } + } + + #[tokio::test] + async fn executes_10_tasks_for_10_seconds_with_one_thread() { + // Given + let number_of_pending_tasks = 10; + let number_of_threads = 1; + let heavy_task_processor = + HeavyAsyncProcessor::new(number_of_threads, number_of_pending_tasks).unwrap(); + + // When + let (broadcast_sender, mut broadcast_receiver) = + tokio::sync::broadcast::channel(1024); + let instant = Instant::now(); + for _ in 0..number_of_pending_tasks { + let broadcast_sender = broadcast_sender.clone(); + let result = heavy_task_processor.spawn(async move { + sleep(Duration::from_secs(1)); + broadcast_sender.send(()).unwrap(); + }); + assert_eq!(result, Ok(())); + } + drop(broadcast_sender); + + // Then + while broadcast_receiver.recv().await.is_ok() {} + assert!(instant.elapsed() >= Duration::from_secs(10)); + } + + #[tokio::test] + async fn executes_10_tasks_for_2_seconds_with_10_thread() { + // Given + let number_of_pending_tasks = 10; + let number_of_threads = 10; + let heavy_task_processor = + HeavyAsyncProcessor::new(number_of_threads, number_of_pending_tasks).unwrap(); + + // When + let (broadcast_sender, mut broadcast_receiver) = + tokio::sync::broadcast::channel(1024); + let instant = Instant::now(); + for _ in 0..number_of_pending_tasks { + let broadcast_sender = broadcast_sender.clone(); + let result = heavy_task_processor.spawn(async move { + sleep(Duration::from_secs(1)); + broadcast_sender.send(()).unwrap(); + }); + assert_eq!(result, Ok(())); + } + drop(broadcast_sender); + + // Then + while broadcast_receiver.recv().await.is_ok() {} + assert!(instant.elapsed() <= Duration::from_secs(2)); + } +} diff --git a/crates/services/txpool/src/lib.rs b/crates/services/txpool/src/lib.rs index 072bc639292..b2521f1caa4 100644 --- a/crates/services/txpool/src/lib.rs +++ b/crates/services/txpool/src/lib.rs @@ -18,6 +18,7 @@ use std::{ pub mod config; mod containers; pub mod error; +pub mod heavy_async_processing; pub mod ports; pub mod service; mod transaction_selector; diff --git a/crates/services/txpool/src/ports.rs b/crates/services/txpool/src/ports.rs index 359e789474e..4a247ff21db 100644 --- a/crates/services/txpool/src/ports.rs +++ b/crates/services/txpool/src/ports.rs @@ -14,6 +14,7 @@ use fuel_core_types::{ Bytes32, ConsensusParameters, Transaction, + TxId, UtxoId, }, fuel_types::{ @@ -28,6 +29,7 @@ use fuel_core_types::{ GossipsubMessageAcceptance, GossipsubMessageInfo, NetworkData, + PeerId, }, }, }; @@ -36,12 +38,17 @@ use std::{ sync::Arc, }; +#[async_trait::async_trait] pub trait PeerToPeer: Send + Sync { type GossipedTransaction: NetworkData; // Gossip broadcast a transaction inserted via API. fn broadcast_transaction(&self, transaction: Arc) -> anyhow::Result<()>; + /// Creates a stream that is filled with the peer_id when they subscribe to + /// our transactions gossip. + fn subscribe_new_peers(&self) -> BoxStream; + /// Creates a stream of next transactions gossiped from the network. fn gossiped_transaction_events(&self) -> BoxStream; @@ -51,6 +58,16 @@ pub trait PeerToPeer: Send + Sync { message_info: GossipsubMessageInfo, validity: GossipsubMessageAcceptance, ) -> anyhow::Result<()>; + + // Asks the network to gather all tx ids of a specific peer + async fn request_tx_ids(&self, peer_id: PeerId) -> anyhow::Result>; + + // Asks the network to gather specific transactions from a specific peer + async fn request_txs( + &self, + peer_id: PeerId, + tx_ids: Vec, + ) -> anyhow::Result>>; } pub trait BlockImporter: Send + Sync { diff --git a/crates/services/txpool/src/service.rs b/crates/services/txpool/src/service.rs index 8166b5b98a1..82dc2e833cc 100644 --- a/crates/services/txpool/src/service.rs +++ b/crates/services/txpool/src/service.rs @@ -35,6 +35,7 @@ use fuel_core_types::{ GossipData, GossipsubMessageAcceptance, GossipsubMessageInfo, + PeerId, TransactionGossipData, }, txpool::{ @@ -48,6 +49,7 @@ use fuel_core_types::{ use update_sender::UpdateSender; use crate::{ + heavy_async_processing::HeavyAsyncProcessor, ports::{ BlockImporter, ConsensusParametersProvider, @@ -167,6 +169,7 @@ impl Cl pub struct Task { gossiped_tx_stream: BoxStream, + new_tx_gossip_subscription: BoxStream, committed_block_stream: BoxStream, tx_pool_shared_state: SharedState< P2P, @@ -176,6 +179,7 @@ pub struct Task, + heavy_async_processor: HeavyAsyncProcessor, ttl_timer: tokio::time::Interval, } @@ -184,13 +188,13 @@ impl where - P2P: PeerToPeer, - ViewProvider: AtomicView, + P2P: PeerToPeer + 'static, + ViewProvider: AtomicView + 'static, View: TxPoolDb, - WasmChecker: WasmCheckerConstraint + Send + Sync, - GasPriceProvider: GasPriceProviderConstraint + Send + Sync, - ConsensusProvider: ConsensusParametersProvider + Send + Sync, - MP: MemoryPool + Send + Sync, + WasmChecker: WasmCheckerConstraint + Send + Sync + 'static, + GasPriceProvider: GasPriceProviderConstraint + Send + Sync + 'static, + ConsensusProvider: ConsensusParametersProvider + Send + Sync + 'static, + MP: MemoryPool + Send + Sync + 'static, { const NAME: &'static str = "TxPool"; @@ -225,13 +229,13 @@ impl where - P2P: PeerToPeer, - ViewProvider: AtomicView, + P2P: PeerToPeer + 'static, + ViewProvider: AtomicView + 'static, View: TxPoolDb, - WasmChecker: WasmCheckerConstraint + Send + Sync, - GasPriceProvider: GasPriceProviderConstraint + Send + Sync, - ConsensusProvider: ConsensusParametersProvider + Send + Sync, - MP: MemoryPool + Send + Sync, + WasmChecker: WasmCheckerConstraint + Send + Sync + 'static, + GasPriceProvider: GasPriceProviderConstraint + Send + Sync + 'static, + ConsensusProvider: ConsensusParametersProvider + Send + Sync + 'static, + MP: MemoryPool + Send + Sync + 'static, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { let should_continue; @@ -271,6 +275,19 @@ where } } + new_peer_subscribed = self.new_tx_gossip_subscription.next() => { + // If we are out of capacity, we will skip this event. + if let Some(peer_id) = new_peer_subscribed { + let _ = self.heavy_async_processor.spawn({ + let shared_state = self.tx_pool_shared_state.clone(); + async move { + shared_state.new_peer_subscribed(peer_id).await; + } + }); + } + should_continue = true; + } + new_transaction = self.gossiped_tx_stream.next() => { if let Some(GossipData { data: Some(tx), message_id, peer_id }) = new_transaction { let current_height = *self.tx_pool_shared_state.current_height.lock(); @@ -412,6 +429,16 @@ impl TxStatusMessage::Status(status), ) } + + pub fn get_tx_ids(&self, max_txs: usize) -> Vec { + self.txpool + .lock() + .txs() + .iter() + .take(max_txs) + .map(|tx| TxId::new(**tx.0)) + .collect() + } } impl @@ -425,6 +452,42 @@ where ConsensusProvider: ConsensusParametersProvider, MP: MemoryPool + Send + Sync, { + async fn new_peer_subscribed(&self, peer_id: PeerId) { + // Gathering txs + let peer_tx_ids = self + .p2p + .request_tx_ids(peer_id.clone()) + .await + .inspect_err(|e| { + tracing::error!("Failed to gather tx ids from peer {}: {}", &peer_id, e); + }) + .unwrap_or_default(); + if peer_tx_ids.is_empty() { + return; + } + let tx_ids_to_ask = self.txpool.lock().filter_existing_tx_ids(peer_tx_ids); + if tx_ids_to_ask.is_empty() { + return; + } + let txs: Vec> = self + .p2p + .request_txs(peer_id.clone(), tx_ids_to_ask) + .await + .inspect_err(|e| { + tracing::error!("Failed to gather tx ids from peer {}: {}", &peer_id, e); + }) + .unwrap_or_default() + .into_iter() + .flatten() + .map(Arc::new) + .collect(); + if txs.is_empty() { + return; + } + // Verifying them + self.insert(txs).await; + } + #[tracing::instrument(name = "insert_submitted_txn", skip_all)] pub async fn insert( &self, @@ -556,6 +619,7 @@ where { let p2p = Arc::new(p2p); let gossiped_tx_stream = p2p.gossiped_transaction_events(); + let new_tx_gossip_subscription = p2p.subscribe_new_peers(); let committed_block_stream = importer.block_events(); let mut ttl_timer = tokio::time::interval(config.transaction_ttl); ttl_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -567,6 +631,7 @@ where ))); let task = Task { gossiped_tx_stream, + new_tx_gossip_subscription, committed_block_stream, tx_pool_shared_state: SharedState { tx_status_sender: TxStatusChange::new( @@ -585,6 +650,7 @@ where gas_price_provider: Arc::new(gas_price_provider), memory_pool: Arc::new(memory_pool), }, + heavy_async_processor: HeavyAsyncProcessor::new(2, 10).unwrap(), ttl_timer, }; diff --git a/crates/services/txpool/src/service/test_helpers.rs b/crates/services/txpool/src/service/test_helpers.rs index 5847dac9a50..a7b24aa847c 100644 --- a/crates/services/txpool/src/service/test_helpers.rs +++ b/crates/services/txpool/src/service/test_helpers.rs @@ -128,6 +128,7 @@ impl TestContext { mockall::mock! { pub P2P {} + #[async_trait::async_trait] impl PeerToPeer for P2P { type GossipedTransaction = GossipedTransaction; @@ -140,6 +141,16 @@ mockall::mock! { message_info: GossipsubMessageInfo, validity: GossipsubMessageAcceptance, ) -> anyhow::Result<()>; + + fn subscribe_new_peers(&self) -> BoxStream; + + async fn request_tx_ids(&self, peer_id: PeerId) -> anyhow::Result>; + + async fn request_txs( + &self, + peer_id: PeerId, + tx_ids: Vec, + ) -> anyhow::Result>>; } } @@ -259,6 +270,8 @@ impl TestContextBuilder { .returning(move |_, _| Ok(())); p2p.expect_broadcast_transaction() .returning(move |_| Ok(())); + p2p.expect_subscribe_new_peers() + .returning(|| Box::pin(fuel_core_services::stream::pending())); let importer = self .importer diff --git a/crates/services/txpool/src/service/tests_p2p.rs b/crates/services/txpool/src/service/tests_p2p.rs index 71270feec1c..3cfbb1b663c 100644 --- a/crates/services/txpool/src/service/tests_p2p.rs +++ b/crates/services/txpool/src/service/tests_p2p.rs @@ -11,17 +11,158 @@ use fuel_core_storage::rand::{ prelude::StdRng, SeedableRng, }; -use fuel_core_types::fuel_tx::{ - field::Inputs, - AssetId, - Transaction, - TransactionBuilder, - UniqueIdentifier, +use fuel_core_types::{ + fuel_tx::{ + field::Inputs, + AssetId, + Transaction, + TransactionBuilder, + UniqueIdentifier, + }, + fuel_types::ChainId, }; use std::{ ops::Deref, time::Duration, }; +use tokio::sync::Notify; + +#[tokio::test] +async fn test_new_subscription_p2p() { + let mut ctx_builder = TestContextBuilder::new(); + let tx1 = ctx_builder.setup_script_tx(10); + let tx2 = ctx_builder.setup_script_tx(100); + + // Given + let wait_notification = Arc::new(Notify::new()); + let notifier = wait_notification.clone(); + let mut p2p = MockP2P::new(); + p2p.expect_gossiped_transaction_events() + .returning(|| Box::pin(fuel_core_services::stream::pending())); + p2p.expect_subscribe_new_peers().returning(|| { + // Return a boxstream that yield one element + Box::pin(fuel_core_services::stream::unfold(0, |state| async move { + if state == 0 { + Some((PeerId::from(vec![1, 2]), state + 1)) + } else { + None + } + })) + }); + let tx1_clone = tx1.clone(); + let tx2_clone = tx2.clone(); + p2p.expect_request_tx_ids().return_once(move |peer_id| { + assert_eq!(peer_id, PeerId::from(vec![1, 2])); + Ok(vec![ + tx1_clone.id(&ChainId::default()), + tx2_clone.id(&ChainId::default()), + ]) + }); + let tx1_clone = tx1.clone(); + let tx2_clone = tx2.clone(); + p2p.expect_request_txs() + .return_once(move |peer_id, tx_ids| { + assert_eq!(peer_id, PeerId::from(vec![1, 2])); + assert_eq!( + tx_ids, + vec![ + tx1_clone.id(&ChainId::default()), + tx2_clone.id(&ChainId::default()) + ] + ); + notifier.notify_one(); + Ok(vec![Some(tx1_clone), Some(tx2_clone)]) + }); + ctx_builder.with_p2p(p2p); + let ctx = ctx_builder.build(); + let service = ctx.service(); + + service.start_and_await().await.unwrap(); + + wait_notification.notified().await; + tokio::time::sleep(Duration::from_millis(1000)).await; + + // When + let out = service.shared.find(vec![ + tx1.id(&Default::default()), + tx2.id(&Default::default()), + ]); + + // Then + assert_eq!(out.len(), 2, "Should be len 2:{out:?}"); + for (tx_pool, tx_expected) in out.into_iter().zip(&[tx1, tx2]) { + assert!(tx_pool.is_some(), "Tx should be some:{tx_pool:?}"); + assert_eq!( + tx_pool.unwrap().id(), + tx_expected.id(&Default::default()), + "Found tx id didn't match" + ); + } +} + +#[tokio::test] +async fn test_new_subscription_p2p_ask_subset_of_transactions() { + let mut ctx_builder = TestContextBuilder::new(); + let tx1 = ctx_builder.setup_script_tx(10); + let tx2 = ctx_builder.setup_script_tx(100); + + // Given + let wait_notification = Arc::new(Notify::new()); + let notifier = wait_notification.clone(); + let mut p2p = MockP2P::new(); + p2p.expect_gossiped_transaction_events() + .returning(|| Box::pin(fuel_core_services::stream::pending())); + p2p.expect_subscribe_new_peers().returning(|| { + // Return a boxstream that yield one element + Box::pin(fuel_core_services::stream::unfold(0, |state| async move { + tokio::time::sleep(Duration::from_millis(1000)).await; + if state == 0 { + Some((PeerId::from(vec![1, 2]), state + 1)) + } else { + None + } + })) + }); + let tx1_clone = tx1.clone(); + let tx2_clone = tx2.clone(); + p2p.expect_request_tx_ids().returning(move |peer_id| { + assert_eq!(peer_id, PeerId::from(vec![1, 2])); + Ok(vec![ + tx1_clone.id(&ChainId::default()), + tx2_clone.id(&ChainId::default()), + ]) + }); + let tx2_clone = tx2.clone(); + p2p.expect_request_txs() + .return_once(move |peer_id, tx_ids| { + assert_eq!(peer_id, PeerId::from(vec![1, 2])); + assert_eq!(tx_ids, vec![tx2_clone.id(&ChainId::default())]); + notifier.notify_one(); + Ok(vec![Some(tx2_clone)]) + }); + ctx_builder.with_p2p(p2p); + let ctx = ctx_builder.build(); + let service = ctx.service(); + + service.start_and_await().await.unwrap(); + service.shared.insert(vec![Arc::new(tx1.clone())]).await; + + wait_notification.notified().await; + tokio::time::sleep(Duration::from_millis(1000)).await; + let out = service.shared.find(vec![ + tx1.id(&Default::default()), + tx2.id(&Default::default()), + ]); + assert_eq!(out.len(), 2, "Should be len 2:{out:?}"); + for (tx_pool, tx_expected) in out.into_iter().zip(&[tx1, tx2]) { + assert!(tx_pool.is_some(), "Tx should be some:{tx_pool:?}"); + assert_eq!( + tx_pool.unwrap().id(), + tx_expected.id(&Default::default()), + "Found tx id didn't match" + ); + } +} #[tokio::test] async fn can_insert_from_p2p() { diff --git a/crates/services/txpool/src/txpool.rs b/crates/services/txpool/src/txpool.rs index 35cc41f9e8e..f0cb2000dd4 100644 --- a/crates/services/txpool/src/txpool.rs +++ b/crates/services/txpool/src/txpool.rs @@ -322,6 +322,13 @@ impl TxPool { Ok(()) } + + pub fn filter_existing_tx_ids(&self, tx_ids: Vec) -> Vec { + tx_ids + .into_iter() + .filter(|tx_id| !self.by_hash.contains_key(tx_id)) + .collect() + } } impl TxPool diff --git a/crates/types/src/services/p2p.rs b/crates/types/src/services/p2p.rs index 8cd98385c0d..e425132522a 100644 --- a/crates/types/src/services/p2p.rs +++ b/crates/types/src/services/p2p.rs @@ -1,5 +1,11 @@ //! Contains types related to P2P data +#[cfg(feature = "serde")] +use serde::{ + Deserialize, + Serialize, +}; + use crate::{ fuel_tx::Transaction, fuel_types::BlockHeight, @@ -15,6 +21,11 @@ use std::{ time::SystemTime, }; +use super::txpool::ArcPoolTx; + +#[cfg(feature = "serde")] +use super::txpool::PoolTransaction; + /// Contains types and logic for Peer Reputation pub mod peer_reputation; @@ -190,3 +201,67 @@ pub struct HeartbeatData { /// The instant representing when the latest heartbeat was received. pub last_heartbeat: SystemTime, } + +/// Type that represents the networkable transaction pool +/// It serializes from an Arc pool transaction and deserializes to a transaction +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum NetworkableTransactionPool { + /// A transaction pool transaction + PoolTransaction(ArcPoolTx), + /// A transaction + Transaction(Transaction), +} + +#[cfg(feature = "serde")] +/// Serialize only the pool transaction variant +impl Serialize for NetworkableTransactionPool { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + NetworkableTransactionPool::PoolTransaction(tx) => match (*tx).as_ref() { + PoolTransaction::Script(script, _) => { + script.transaction().serialize(serializer) + } + PoolTransaction::Create(create, _) => { + create.transaction().serialize(serializer) + } + PoolTransaction::Blob(blob, _) => { + blob.transaction().serialize(serializer) + } + PoolTransaction::Upgrade(upgrade, _) => { + upgrade.transaction().serialize(serializer) + } + PoolTransaction::Upload(upload, _) => { + upload.transaction().serialize(serializer) + } + }, + NetworkableTransactionPool::Transaction(tx) => tx.serialize(serializer), + } + } +} + +#[cfg(feature = "serde")] +/// Deserialize to a transaction variant +impl<'de> Deserialize<'de> for NetworkableTransactionPool { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + Ok(NetworkableTransactionPool::Transaction( + Transaction::deserialize(deserializer)?, + )) + } +} + +impl TryFrom for Transaction { + type Error = &'static str; + + fn try_from(value: NetworkableTransactionPool) -> Result { + match value { + NetworkableTransactionPool::Transaction(tx) => Ok(tx), + _ => Err("Cannot convert to transaction"), + } + } +} diff --git a/tests/tests/tx_gossip.rs b/tests/tests/tx_gossip.rs index 3aa3cb08ba5..8397d4068bd 100644 --- a/tests/tests/tx_gossip.rs +++ b/tests/tests/tx_gossip.rs @@ -199,7 +199,7 @@ async fn test_tx_gossiping_invalid_txs( .submit(valid_transaction.clone()) .await .expect("Transaction is valid"); - let block = tokio::time::timeout(Duration::from_secs(5), authority_blocks.next()) + let block = tokio::time::timeout(Duration::from_secs(8), authority_blocks.next()) .await? .unwrap(); Ok(block)