Skip to content

Commit

Permalink
Merge branch 'master' into feature/fuel-core-0.35.0-benchamrks
Browse files Browse the repository at this point in the history
  • Loading branch information
xgreenx authored Sep 19, 2024
2 parents b95a57d + 5963a4e commit 91b1d9d
Show file tree
Hide file tree
Showing 24 changed files with 1,175 additions and 99 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Added
- [2131](https://github.com/FuelLabs/fuel-core/pull/2131): Add flow in TxPool in order to ask to newly connected peers to share their transaction pool

### Changed

#### Breaking
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub struct P2PArgs {
#[clap(long = "max-headers-per-request", default_value = "100", env)]
pub max_headers_per_request: usize,

/// Max number of txs in a single txs request response
#[clap(long = "max-txs-per-request", default_value = "10000", env)]
pub max_txs_per_request: usize,

/// Addresses of the bootstrap nodes
/// They should contain PeerId within their `Multiaddr`
#[clap(long = "bootstrap-nodes", value_delimiter = ',', env)]
Expand Down Expand Up @@ -304,6 +308,7 @@ impl P2PArgs {
tcp_port: self.peering_port,
max_block_size: self.max_block_size,
max_headers_per_request: self.max_headers_per_request,
max_txs_per_request: self.max_txs_per_request,
bootstrap_nodes: self.bootstrap_nodes,
reserved_nodes: self.reserved_nodes,
reserved_nodes_only_mode: self.reserved_nodes_only_mode,
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use crate::state::{
};
#[cfg(feature = "rocksdb")]
use std::path::Path;
use fuel_core_gas_price_service::fuel_gas_price_updater::fuel_core_storage_adapter::storage::{ GasPriceMetadata};
use fuel_core_gas_price_service::fuel_gas_price_updater::fuel_core_storage_adapter::storage::GasPriceMetadata;
use crate::database::database_description::gas_price::GasPriceDatabase;

// Storages implementation
Expand Down
36 changes: 27 additions & 9 deletions crates/fuel-core/src/p2p_test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use fuel_core_p2p::{
codecs::postcard::PostcardCodec,
network_service::FuelP2PService,
p2p_service::FuelP2PEvent,
request_response::messages::{
RequestMessage,
ResponseMessage,
},
service::to_message_acceptance,
};
use fuel_core_poa::{
Expand Down Expand Up @@ -153,16 +157,30 @@ impl Bootstrap {
}
event = bootstrap.next_event() => {
// The bootstrap node only forwards data without validating it.
if let Some(FuelP2PEvent::GossipsubMessage {
peer_id,
message_id,
..
}) = event {
bootstrap.report_message_validation_result(
&message_id,
match event {
Some(FuelP2PEvent::GossipsubMessage {
peer_id,
to_message_acceptance(&GossipsubMessageAcceptance::Accept)
)
message_id,
..
}) => {
bootstrap.report_message_validation_result(
&message_id,
peer_id,
to_message_acceptance(&GossipsubMessageAcceptance::Accept)
)
},
Some(FuelP2PEvent::InboundRequestMessage {
request_id,
request_message
}) => {
if request_message == RequestMessage::TxPoolAllTransactionsIds {
let _ = bootstrap.send_response_msg(
request_id,
ResponseMessage::TxPoolAllTransactionsIds(Some(vec![])),
);
}
}
_ => {}
}
}
}
Expand Down
30 changes: 28 additions & 2 deletions crates/fuel-core/src/service/adapters/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
use super::BlockImporterAdapter;
use super::{
BlockImporterAdapter,
TxPoolAdapter,
};
use crate::database::OnChainIterableKeyValueView;
use fuel_core_p2p::ports::{
BlockHeightImporter,
P2pDb,
TxPool,
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
use fuel_core_txpool::types::TxId;
use fuel_core_types::{
blockchain::{
consensus::Genesis,
SealedBlockHeader,
},
fuel_types::BlockHeight,
services::p2p::Transactions,
services::p2p::{
NetworkableTransactionPool,
Transactions,
},
};
use std::ops::Range;

Expand Down Expand Up @@ -49,3 +57,21 @@ impl BlockHeightImporter for BlockImporterAdapter {
)
}
}

impl TxPool for TxPoolAdapter {
fn get_tx_ids(&self, max_txs: usize) -> Vec<TxId> {
self.service.get_tx_ids(max_txs)
}

fn get_full_txs(&self, tx_ids: Vec<TxId>) -> Vec<Option<NetworkableTransactionPool>> {
self.service
.find(tx_ids)
.into_iter()
.map(|tx_info| {
tx_info.map(|tx| {
NetworkableTransactionPool::PoolTransaction(tx.tx().clone())
})
})
.collect()
}
}
4 changes: 1 addition & 3 deletions crates/fuel-core/src/service/adapters/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ impl PeerToPeerPort for P2PAdapter {
data: range,
} = range;
if let Some(service) = &self.service {
service
.get_transactions_from_peer(peer_id.into(), range)
.await
service.get_transactions_from_peer(peer_id, range).await
} else {
Err(anyhow::anyhow!("No P2P service available"))
}
Expand Down
57 changes: 57 additions & 0 deletions crates/fuel-core/src/service/adapters/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use fuel_core_txpool::{
GasPriceProvider,
MemoryPool,
},
types::TxId,
Result as TxPoolResult,
};
use fuel_core_types::{
Expand All @@ -52,6 +53,7 @@ use fuel_core_types::{
p2p::{
GossipsubMessageAcceptance,
GossipsubMessageInfo,
PeerId,
TransactionGossipData,
},
},
Expand All @@ -65,6 +67,7 @@ impl BlockImporter for BlockImporterAdapter {
}

#[cfg(feature = "p2p")]
#[async_trait::async_trait]
impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
type GossipedTransaction = TransactionGossipData;

Expand All @@ -91,6 +94,21 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
}
}

fn subscribe_new_peers(&self) -> BoxStream<PeerId> {
use tokio_stream::{
wrappers::BroadcastStream,
StreamExt,
};
if let Some(service) = &self.service {
Box::pin(
BroadcastStream::new(service.subscribe_new_peers())
.filter_map(|result| result.ok()),
)
} else {
Box::pin(fuel_core_services::stream::pending())
}
}

fn notify_gossip_transaction_validity(
&self,
message_info: GossipsubMessageInfo,
Expand All @@ -102,9 +120,32 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
Ok(())
}
}

async fn request_tx_ids(&self, peer_id: PeerId) -> anyhow::Result<Vec<TxId>> {
if let Some(service) = &self.service {
service.get_all_transactions_ids_from_peer(peer_id).await
} else {
Ok(vec![])
}
}

async fn request_txs(
&self,
peer_id: PeerId,
tx_ids: Vec<TxId>,
) -> anyhow::Result<Vec<Option<Transaction>>> {
if let Some(service) = &self.service {
service
.get_full_transactions_from_peer(peer_id, tx_ids)
.await
} else {
Ok(vec![])
}
}
}

#[cfg(not(feature = "p2p"))]
#[async_trait::async_trait]
impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
type GossipedTransaction = TransactionGossipData;

Expand All @@ -126,6 +167,22 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
) -> anyhow::Result<()> {
Ok(())
}

fn subscribe_new_peers(&self) -> BoxStream<PeerId> {
Box::pin(fuel_core_services::stream::pending())
}

async fn request_tx_ids(&self, _peer_id: PeerId) -> anyhow::Result<Vec<TxId>> {
Ok(vec![])
}

async fn request_txs(
&self,
_peer_id: PeerId,
_tx_ids: Vec<TxId>,
) -> anyhow::Result<Vec<Option<Transaction>>> {
Ok(vec![])
}
}

impl fuel_core_txpool::ports::TxPoolDb for OnChainIterableKeyValueView {
Expand Down
31 changes: 21 additions & 10 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub type PoAService = fuel_core_poa::Service<
SystemTime,
>;
#[cfg(feature = "p2p")]
pub type P2PService = fuel_core_p2p::service::Service<Database>;
pub type P2PService = fuel_core_p2p::service::Service<Database, TxPoolAdapter>;
pub type TxPoolSharedState = fuel_core_txpool::service::SharedState<
P2PAdapter,
Database,
Expand Down Expand Up @@ -169,14 +169,10 @@ pub fn init_sub_services(
};

#[cfg(feature = "p2p")]
let mut network = config.p2p.clone().map(|p2p_config| {
fuel_core_p2p::service::new_service(
chain_id,
p2p_config,
database.on_chain().clone(),
importer_adapter.clone(),
)
});
let p2p_externals = config
.p2p
.clone()
.map(fuel_core_p2p::service::build_shared_state);

#[cfg(feature = "p2p")]
let p2p_adapter = {
Expand All @@ -192,7 +188,7 @@ pub fn init_sub_services(
invalid_transactions: -100.,
};
P2PAdapter::new(
network.as_ref().map(|network| network.shared.clone()),
p2p_externals.as_ref().map(|ext| ext.0.clone()),
peer_report_config,
)
};
Expand Down Expand Up @@ -229,6 +225,21 @@ pub fn init_sub_services(
);
let tx_pool_adapter = TxPoolAdapter::new(txpool.shared.clone());

#[cfg(feature = "p2p")]
let mut network = config.p2p.clone().zip(p2p_externals).map(
|(p2p_config, (shared_state, request_receiver))| {
fuel_core_p2p::service::new_service(
chain_id,
p2p_config,
shared_state,
request_receiver,
database.on_chain().clone(),
importer_adapter.clone(),
tx_pool_adapter.clone(),
)
},
);

let block_producer = fuel_core_producer::Producer {
config: config.block_producer.clone(),
view_provider: database.on_chain().clone(),
Expand Down
8 changes: 8 additions & 0 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub const MAX_RESPONSE_SIZE: usize = 18 * 1024 * 1024;
/// Maximum number of blocks per request.
pub const MAX_HEADERS_PER_REQUEST: usize = 100;

/// Maximum number of transactions ids asked per request.
pub const MAX_TXS_PER_REQUEST: usize = 10000;

#[derive(Clone, Debug)]
pub struct Config<State = Initialized> {
/// The keypair used for handshake during communication with other p2p nodes.
Expand All @@ -73,6 +76,9 @@ pub struct Config<State = Initialized> {
pub max_block_size: usize,
pub max_headers_per_request: usize,

// Maximum of txs id asked in a single request
pub max_txs_per_request: usize,

// `DiscoveryBehaviour` related fields
pub bootstrap_nodes: Vec<Multiaddr>,
pub enable_mdns: bool,
Expand Down Expand Up @@ -151,6 +157,7 @@ impl Config<NotInitialized> {
tcp_port: self.tcp_port,
max_block_size: self.max_block_size,
max_headers_per_request: self.max_headers_per_request,
max_txs_per_request: self.max_txs_per_request,
bootstrap_nodes: self.bootstrap_nodes,
enable_mdns: self.enable_mdns,
max_peers_connected: self.max_peers_connected,
Expand Down Expand Up @@ -200,6 +207,7 @@ impl Config<NotInitialized> {
tcp_port: 0,
max_block_size: MAX_RESPONSE_SIZE,
max_headers_per_request: MAX_HEADERS_PER_REQUEST,
max_txs_per_request: MAX_TXS_PER_REQUEST,
bootstrap_nodes: vec![],
enable_mdns: false,
max_peers_connected: 50,
Expand Down
Loading

0 comments on commit 91b1d9d

Please sign in to comment.