Skip to content

Commit

Permalink
Refactor persistent broadcast (#3847)
Browse files Browse the repository at this point in the history
* Revert "Do not cache local txs in PeerInfo (#3813)"

This reverts commit 50b4cc4.

* refactor

* fixes

* cosmetic

* cosmetic
  • Loading branch information
marcindsobczak authored Mar 1, 2022
1 parent e1e0293 commit 84e7028
Show file tree
Hide file tree
Showing 16 changed files with 66 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void should_send_up_to_MaxCount_hashes_in_one_NewPooledTransactionHashesM
txs[i] = Build.A.Transaction.SignedAndResolved().TestObject;
}

_handler.SendNewTransactions(txs);
_handler.SendNewTransactions(txs, false);

_session.Received(1).DeliverMessage(Arg.Is<NewPooledTransactionHashesMessage>(m => m.Hashes.Count == txCount));
}
Expand All @@ -127,7 +127,7 @@ public void should_send_more_than_MaxCount_hashes_in_more_than_one_NewPooledTran
txs[i] = Build.A.Transaction.SignedAndResolved().TestObject;
}

_handler.SendNewTransactions(txs);
_handler.SendNewTransactions(txs, false);

_session.Received(messagesCount).DeliverMessage(Arg.Is<NewPooledTransactionHashesMessage>(m => m.Hashes.Count == NewPooledTransactionHashesMessage.MaxCount || m.Hashes.Count == nonFullMsgTxsCount));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Utilities;
Expand Down Expand Up @@ -224,10 +223,7 @@ public void SendNewTransaction(Transaction tx)
SendMessage(new[]{tx});
}

public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) =>
SendNewTransactions(txs.Select(t => t.Tx));

public virtual void SendNewTransactions(IEnumerable<Transaction> txs)
public virtual void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx = false)
{
const int maxCapacity = 256;
using ArrayPoolList<Transaction> txsToSend = new(maxCapacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,14 @@ protected PooledTransactionsMessage FulfillPooledTransactionsRequest(
return new PooledTransactionsMessage(txs);
}

public override void SendNewTransactions(IEnumerable<Transaction> txs)
public override void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx)
{
if (sendFullTx)
{
base.SendNewTransactions(txs);
return;
}

using ArrayPoolList<Keccak> hashes = new(NewPooledTransactionHashesMessage.MaxCount);

foreach (Transaction tx in txs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
public void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx)
{
throw new NotImplementedException();
}
Expand Down Expand Up @@ -1001,7 +1001,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
public void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
public void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
public void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void HintNewBlock(Keccak blockHash, long number)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { }
public void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx) { }

public Task<TxReceipt[][]> GetReceipts(IReadOnlyList<Keccak> blockHash, CancellationToken token)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { }
public void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx) { }

public Task<TxReceipt[][]> GetReceipts(IReadOnlyList<Keccak> blockHash, CancellationToken token)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { }
public void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx) { }

public Task<TxReceipt[][]> GetReceipts(IReadOnlyList<Keccak> blockHash, CancellationToken token)
{
Expand Down
36 changes: 6 additions & 30 deletions src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ public void Setup()

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_pick_best_persistent_txs_to_broadcast(int threshold)
Expand All @@ -90,28 +86,14 @@ public void should_pick_best_persistent_txs_to_broadcast(int threshold)

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

ITxPoolPeer txPoolPeer = Substitute.For<ITxPoolPeer>();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, Array.Empty<Transaction>()).Select(t => t.Tx).ToList();
List<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend().ToList();

int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount);
int expectedCount = threshold <= 0 ? 0 : addedTxsCount;
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();

for (int i = 1; i <= expectedCount; i++)
{
expectedTxs.Add(transactions[addedTxsCount - i]);
}

expectedTxs.Should().BeEquivalentTo(pickedTxs);
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int threshold)
Expand Down Expand Up @@ -142,10 +124,9 @@ public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int thre

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

ITxPoolPeer txPoolPeer = Substitute.For<ITxPoolPeer>();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, Array.Empty<Transaction>()).Select(t => t.Tx).ToList();
List<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend().ToList();

int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei);
int expectedCount = threshold <= 0 ? 0 : addedTxsCount - currentBaseFeeInGwei;
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();
Expand All @@ -160,10 +141,6 @@ public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int thre

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_not_pick_1559_txs_with_MaxFeePerGas_lower_than_CurrentBaseFee(int threshold)
Expand Down Expand Up @@ -195,10 +172,9 @@ public void should_not_pick_1559_txs_with_MaxFeePerGas_lower_than_CurrentBaseFee

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

ITxPoolPeer txPoolPeer = Substitute.For<ITxPoolPeer>();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, Array.Empty<Transaction>()).Select(t => t.Tx).ToList();
List<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend().ToList();

int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei);
int expectedCount = threshold <= 0 ? 0 : addedTxsCount - currentBaseFeeInGwei;
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ public void should_notify_added_peer_of_own_tx()
ITxPoolPeer txPoolPeer = Substitute.For<ITxPoolPeer>();
txPoolPeer.Id.Returns(TestItem.PublicKeyA);
_txPool.AddPeer(txPoolPeer);
txPoolPeer.Received().SendNewTransactions(Arg.Any<IEnumerable<Transaction>>());
txPoolPeer.Received().SendNewTransactions(Arg.Any<IEnumerable<Transaction>>(), false);
}

[Test]
Expand All @@ -1030,7 +1030,7 @@ public async Task should_notify_peer_only_once()
_txPool.AddPeer(txPoolPeer);
Transaction tx = AddTransactionToPool();
await Task.Delay(500);
txPoolPeer.Received(1).SendNewTransactions(Arg.Any<IEnumerable<Transaction>>());
txPoolPeer.Received(1).SendNewTransactions(Arg.Any<IEnumerable<Transaction>>(), false);
}

[Test]
Expand Down
15 changes: 0 additions & 15 deletions src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,6 @@ public bool TryTakeFirst(out TValue first)
return TryRemove(GetKey(sortedValues.Min), out first);
}

/// <summary>
/// Returns best element of each bucket in supplied comparer order.
/// </summary>
[MethodImpl(MethodImplOptions.Synchronized)]
public IEnumerable<TValue> GetFirsts()
{
SortedSet<TValue> sortedValues = new(_sortedComparer);
foreach (KeyValuePair<TGroupKey, SortedSet<TValue>> bucket in _buckets)
{
sortedValues.Add(bucket.Value.Max!);
}

return sortedValues;
}

/// <summary>
/// Gets last element in supplied comparer order.
/// </summary>
Expand Down
6 changes: 2 additions & 4 deletions src/Nethermind/Nethermind.TxPool/ITxPoolPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
//

using System.Collections.Generic;
using System.Linq;
using Nethermind.Core;
using Nethermind.Core.Crypto;

Expand All @@ -26,8 +25,7 @@ public interface ITxPoolPeer
{
public PublicKey Id { get; }
public string Enode => string.Empty;
void SendNewTransaction(Transaction tx) => SendNewTransactions(new[]{tx});
void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs);
void SendNewTransactions(IEnumerable<Transaction> txs) => SendNewTransactions(txs.Select(t => (t, false)));
void SendNewTransaction(Transaction tx) => SendNewTransactions(new[]{tx}, true);
void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx);
}
}
12 changes: 6 additions & 6 deletions src/Nethermind/Nethermind.TxPool/PeerInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ public void SendNewTransaction(Transaction tx)
Peer.SendNewTransaction(tx);
}

public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
public void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx)
{
Peer.SendNewTransactions(GetTxsToSendAndMarkAsNotified(txs));
Peer.SendNewTransactions(GetTxsToSendAndMarkAsNotified(txs, sendFullTx), sendFullTx);
}
private IEnumerable<Transaction> GetTxsToSendAndMarkAsNotified(IEnumerable<(Transaction Tx, bool IsPersistent)> txs)

private IEnumerable<Transaction> GetTxsToSendAndMarkAsNotified(IEnumerable<Transaction> txs, bool sendFullTx)
{
foreach ((Transaction tx, bool isPersistent) in txs)
foreach (Transaction tx in txs)
{
if (isPersistent || NotifiedTransactions.Set(tx.Hash))
if (sendFullTx || NotifiedTransactions.Set(tx.Hash))
{
yield return tx;
}
Expand Down
68 changes: 32 additions & 36 deletions src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Nethermind.Core;
using Nethermind.Core.Crypto;
Expand Down Expand Up @@ -121,7 +120,33 @@ private void BroadcastOnce(Transaction tx)

public void BroadcastOnce(ITxPoolPeer peer, Transaction[] txs)
{
Notify(peer, txs);
Notify(peer, txs, false);
}

public void BroadcastPersistentTxs()
{
if (_logger.IsDebug) _logger.Debug($"Broadcasting persistent transactions to all peers");

foreach ((_, ITxPoolPeer peer) in _peers)
{
Notify(peer, GetPersistentTxsToSend(), true);
}
}

internal IEnumerable<Transaction> GetPersistentTxsToSend()
{
if (_txPoolConfig.PeerNotificationThreshold <= 0)
{
yield break;
}

foreach (Transaction tx in _persistentTxs.GetSnapshot())
{
if (tx.MaxFeePerGas >= _headInfo.CurrentBaseFee)
{
yield return tx;
}
}
}

public void StopBroadcast(Keccak txHash)
Expand Down Expand Up @@ -158,7 +183,7 @@ void NotifyPeers()

foreach ((_, ITxPoolPeer peer) in _peers)
{
Notify(peer, GetTxsToSend(peer, _txsToSend));
Notify(peer, GetTxsToSend(peer, _txsToSend), false);
}

_txsToSend.Clear();
Expand All @@ -168,51 +193,22 @@ void NotifyPeers()
_timer.Enabled = true;
}

internal IEnumerable<(Transaction Tx, bool IsPersistent)> GetTxsToSend(ITxPoolPeer peer, IEnumerable<Transaction> txsToSend)
private IEnumerable<Transaction> GetTxsToSend(ITxPoolPeer peer, IEnumerable<Transaction> txsToSend)
{
if (_txPoolConfig.PeerNotificationThreshold > 0)
{
// PeerNotificationThreshold is a declared in config percent of transactions in persistent broadcast,
// which will be sent when timer elapse. numberOfPersistentTxsToBroadcast is equal to
// PeerNotificationThreshold multiplication by number of transactions in persistent broadcast, rounded up.
int numberOfPersistentTxsToBroadcast =
Math.Min(_txPoolConfig.PeerNotificationThreshold * _persistentTxs.Count / 100 + 1,
_persistentTxs.Count);

foreach (Transaction tx in _persistentTxs.GetFirsts())
{
if (numberOfPersistentTxsToBroadcast > 0)
{
if (tx.MaxFeePerGas >= _headInfo.CurrentBaseFee)
{
numberOfPersistentTxsToBroadcast--;
yield return (tx, true);
}
}
else
{
break;
}
}
}

foreach (Transaction tx in txsToSend)
{
if (tx.DeliveredBy is null || !tx.DeliveredBy.Equals(peer.Id))
{
yield return (tx, false);
yield return tx;
}
}
}

private void Notify(ITxPoolPeer peer, IEnumerable<Transaction> txs) =>
Notify(peer, txs.Select(t => (t, false)));

private void Notify(ITxPoolPeer peer, IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
private void Notify(ITxPoolPeer peer, IEnumerable<Transaction> txs, bool sendFullTx)
{
try
{
peer.SendNewTransactions(txs);
peer.SendNewTransactions(txs, sendFullTx);
if (_logger.IsTrace) _logger.Trace($"Notified {peer} about transactions.");
}
catch (Exception e)
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.TxPool/TxPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ private void ProcessNewHeads()
ReAddReorganisedTransactions(args.PreviousBlock);
RemoveProcessedTransactions(args.Block.Transactions);
UpdateBuckets();
_broadcaster.BroadcastPersistentTxs();
Metrics.TransactionCount = _transactions.Count;
}
catch (Exception e)
Expand Down

0 comments on commit 84e7028

Please sign in to comment.