Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor persistent broadcast #3901

Merged
merged 11 commits into from
Mar 28, 2022
39 changes: 27 additions & 12 deletions src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ public void Setup()
_headInfo = Substitute.For<IChainHeadInfoProvider>();
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_pick_best_persistent_txs_to_broadcast(int threshold)
{
_txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold };
Expand All @@ -86,16 +88,27 @@ public void should_pick_best_persistent_txs_to_broadcast(int threshold)

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

List<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend().ToList();
_broadcaster.GetPersistentTxsToSend(out IList<Transaction> pickedTxs);

int expectedCount = threshold <= 0 ? 0 : addedTxsCount;
int expectedCount = Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount);
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();

for (int i = 1; i <= expectedCount; i++)
{
expectedTxs.Add(transactions[addedTxsCount - i]);
}

expectedTxs.Should().BeEquivalentTo(pickedTxs);
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int threshold)
{
_txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold };
Expand Down Expand Up @@ -124,9 +137,9 @@ public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int thre

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

List<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend().ToList();
_broadcaster.GetPersistentTxsToSend(out IList<Transaction> pickedTxs);

int expectedCount = threshold <= 0 ? 0 : addedTxsCount - currentBaseFeeInGwei;
int expectedCount = Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei);
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();
Expand All @@ -139,10 +152,12 @@ public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int thre
expectedTxs.Should().BeEquivalentTo(pickedTxs);
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_not_pick_1559_txs_with_MaxFeePerGas_lower_than_CurrentBaseFee(int threshold)
{
_txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold };
Expand Down Expand Up @@ -172,9 +187,9 @@ public void should_not_pick_1559_txs_with_MaxFeePerGas_lower_than_CurrentBaseFee

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

List<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend().ToList();
_broadcaster.GetPersistentTxsToSend(out IList<Transaction> pickedTxs);

int expectedCount = threshold <= 0 ? 0 : addedTxsCount - currentBaseFeeInGwei;
int expectedCount = Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei);
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();
Expand Down
15 changes: 15 additions & 0 deletions src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,21 @@ public bool TryTakeFirst(out TValue first)
return TryRemove(GetKey(sortedValues.Min), out first);
}

/// <summary>
/// Returns best element of each bucket in supplied comparer order.
/// </summary>
[MethodImpl(MethodImplOptions.Synchronized)]
public IEnumerable<TValue> GetFirsts()
{
SortedSet<TValue> sortedValues = new(_sortedComparer);
foreach (KeyValuePair<TGroupKey, SortedSet<TValue>> bucket in _buckets)
{
sortedValues.Add(bucket.Value.Max!);
marcindsobczak marked this conversation as resolved.
Show resolved Hide resolved
}

return sortedValues;
}

/// <summary>
/// Gets last element in supplied comparer order.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.TxPool/ITxPoolConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace Nethermind.TxPool
{
public interface ITxPoolConfig : IConfig
{
[ConfigItem(DefaultValue = "1", Description = "Defines average percent of tx hashes from persistent broadcast send to peer together with hashes of last added txs.")]
[ConfigItem(DefaultValue = "5", Description = "Defines average percent of tx hashes from persistent broadcast send to peer together with hashes of last added txs.")]
int PeerNotificationThreshold { get; set; }

[ConfigItem(DefaultValue = "2048", Description = "Max number of transactions held in mempool (more transactions in mempool mean more memory used")]
Expand Down
52 changes: 40 additions & 12 deletions src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,26 +130,54 @@ public void BroadcastOnce(ITxPoolPeer peer, Transaction[] txs)

public void BroadcastPersistentTxs()
{
if (_logger.IsDebug) _logger.Debug($"Broadcasting persistent transactions to all peers");
if (_txPoolConfig.PeerNotificationThreshold > 0)
{
GetPersistentTxsToSend(out IList<Transaction> persistentTxsToSend);

foreach ((_, ITxPoolPeer peer) in _peers)
if (persistentTxsToSend.Count > 0)
{
if (_logger.IsDebug) _logger.Debug($"Broadcasting {persistentTxsToSend.Count} persistent transactions to all peers.");

foreach ((_, ITxPoolPeer peer) in _peers)
{
Notify(peer, persistentTxsToSend, true);
}
}
else
{
if (_logger.IsDebug) _logger.Debug($"There are currently no transactions able to broadcast.");
}
}
else
{
Notify(peer, GetPersistentTxsToSend(), true);
if (_logger.IsDebug) _logger.Debug($"PeerNotificationThreshold is not a positive value: {_txPoolConfig.PeerNotificationThreshold}. Skipping broadcasting persistent transactions.");
}
}

internal IEnumerable<Transaction> GetPersistentTxsToSend()
internal void GetPersistentTxsToSend(out IList<Transaction> persistentTxsToSend)
marcindsobczak marked this conversation as resolved.
Show resolved Hide resolved
{
if (_txPoolConfig.PeerNotificationThreshold <= 0)
{
yield break;
}

foreach (Transaction tx in _persistentTxs.GetSnapshot())
// PeerNotificationThreshold is a declared in config max percent of transactions in persistent broadcast,
// which will be sent after processing of every block. numberOfPersistentTxsToBroadcast is equal to
// PeerNotificationThreshold multiplication by number of transactions in persistent broadcast, rounded up.
int numberOfPersistentTxsToBroadcast =
Math.Min(_txPoolConfig.PeerNotificationThreshold * _persistentTxs.Count / 100 + 1,
_persistentTxs.Count);

persistentTxsToSend = new List<Transaction>(numberOfPersistentTxsToBroadcast);

foreach (Transaction tx in _persistentTxs.GetFirsts())
{
if (tx.MaxFeePerGas >= _headInfo.CurrentBaseFee)
if (numberOfPersistentTxsToBroadcast > 0)
{
yield return tx;
if (tx.MaxFeePerGas >= _headInfo.CurrentBaseFee)
{
numberOfPersistentTxsToBroadcast--;
persistentTxsToSend.Add(tx);
}
}
else
{
break;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.TxPool/TxPoolConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Nethermind.TxPool
{
public class TxPoolConfig : ITxPoolConfig
{
public int PeerNotificationThreshold { get; set; } = 1;
public int PeerNotificationThreshold { get; set; } = 5;
public int Size { get; set; } = 2048;
public int HashCacheSize { get; set; } = 512 * 1024;
public long? GasLimit { get; set; } = null;
Expand Down