Skip to content

Commit

Permalink
feat(network): PropagateTransactionsTo implementation (#4772)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
  • Loading branch information
SleepingShell and mattsse authored Sep 30, 2023
1 parent 01b264a commit 05198e9
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 15 deletions.
5 changes: 5 additions & 0 deletions crates/net/network/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ impl<T: Hash + Eq> LruCache<T> {
{
self.inner.contains(value)
}

/// Returns an iterator over all cached entries
pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
self.inner.iter()
}
}

impl<T> Extend<T> for LruCache<T>
Expand Down
106 changes: 91 additions & 15 deletions crates/net/network/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use reth_transaction_pool::{
PropagatedTransactions, TransactionPool, ValidPoolTransaction,
};
use std::{
collections::{hash_map::Entry, HashMap},
collections::{hash_map::Entry, HashMap, HashSet},
num::NonZeroUsize,
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -91,8 +91,10 @@ impl TransactionsHandle {
}

/// Request the active peer IDs from the [`TransactionsManager`].
pub fn get_active_peers(&self) {
self.send(TransactionsCommand::GetActivePeers)
pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
let (tx, rx) = oneshot::channel();
self.send(TransactionsCommand::GetActivePeers(tx));
rx.await
}

/// Manually propagate full transactions to a specific peer.
Expand All @@ -101,13 +103,22 @@ impl TransactionsHandle {
}

/// Request the transaction hashes known by specific peers.
pub fn get_transaction_hashes(&self, peers: Vec<PeerId>) {
self.send(TransactionsCommand::GetTransactionHashes(peers))
pub async fn get_transaction_hashes(
&self,
peers: Vec<PeerId>,
) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
let (tx, rx) = oneshot::channel();
self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
rx.await
}

/// Request the transaction hashes known by a specific peer.
pub fn get_peer_transaction_hashes(&self, peer: PeerId) {
self.send(TransactionsCommand::GetPeerTransactionHashes(peer))
pub async fn get_peer_transaction_hashes(
&self,
peer: PeerId,
) -> Result<HashSet<TxHash>, RecvError> {
let res = self.get_transaction_hashes(vec![peer]).await?;
Ok(res.into_values().next().unwrap_or_default())
}
}

Expand Down Expand Up @@ -346,6 +357,53 @@ where
propagated
}

/// Propagate the full transactions to a specific peer
///
/// Returns the propagated transactions
fn propagate_full_transactions_to_peer(
&mut self,
txs: Vec<TxHash>,
peer_id: PeerId,
) -> Option<PropagatedTransactions> {
let peer = self.peers.get_mut(&peer_id)?;
let mut propagated = PropagatedTransactions::default();
trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");

// filter all transactions unknown to the peer
let mut full_transactions = FullTransactionsBuilder::default();

let to_propagate = self
.pool
.get_all(txs)
.into_iter()
.filter(|tx| !tx.transaction.is_eip4844())
.map(PropagateTransaction::new);

// Iterate through the transactions to propagate and fill the hashes and full transaction
for tx in to_propagate {
if peer.transactions.insert(tx.hash()) {
full_transactions.push(&tx);
}
}

if full_transactions.transactions.is_empty() {
// nothing to propagate
return None
}

let new_full_transactions = full_transactions.build();
for tx in new_full_transactions.iter() {
propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(peer_id));
}
// send full transactions
self.network.send_transactions(peer_id, new_full_transactions);

// Update propagated transactions metrics
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);

Some(propagated)
}

/// Propagate the transaction hashes to the given peer
///
/// Note: This will only send the hashes for transactions that exist in the pool.
Expand Down Expand Up @@ -493,10 +551,27 @@ where
TransactionsCommand::PropagateHashesTo(hashes, peer) => {
self.propagate_hashes_to(hashes, peer)
}
TransactionsCommand::GetActivePeers => todo!(),
TransactionsCommand::PropagateTransactionsTo(_txs, _peer) => todo!(),
TransactionsCommand::GetTransactionHashes(_peers) => todo!(),
TransactionsCommand::GetPeerTransactionHashes(_peer) => todo!(),
TransactionsCommand::GetActivePeers(tx) => {
let peers = self.peers.keys().copied().collect::<HashSet<_>>();
tx.send(peers).ok();
}
TransactionsCommand::PropagateTransactionsTo(_txs, _peer) => {
if let Some(propagated) = self.propagate_full_transactions_to_peer(_txs, _peer) {
self.pool.on_propagated(propagated);
}
}
TransactionsCommand::GetTransactionHashes { peers, tx } => {
let mut res = HashMap::with_capacity(peers.len());
for peer_id in peers {
let hashes = self
.peers
.get(&peer_id)
.map(|peer| peer.transactions.iter().copied().collect::<HashSet<_>>())
.unwrap_or_default();
res.insert(peer_id, hashes);
}
tx.send(res).ok();
}
}
}

Expand Down Expand Up @@ -929,13 +1004,14 @@ enum TransactionsCommand {
/// Propagate transaction hashes to a specific peer.
PropagateHashesTo(Vec<B256>, PeerId),
/// Request the list of active peer IDs from the [`TransactionsManager`].
GetActivePeers,
GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
/// Propagate a collection of full transactions to a specific peer.
PropagateTransactionsTo(Vec<TxHash>, PeerId),
/// Request transaction hashes known by specific peers from the [`TransactionsManager`].
GetTransactionHashes(Vec<PeerId>),
/// Request transaction hashes known by a specific peer from the [`TransactionsManager`].
GetPeerTransactionHashes(PeerId),
GetTransactionHashes {
peers: Vec<PeerId>,
tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
},
}

/// All events related to transactions emitted by the network.
Expand Down

0 comments on commit 05198e9

Please sign in to comment.