Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast best persistent txs instead of random #3768

Merged
merged 27 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
950c024
broadcast specified % of best txs in persistent broadcast instead of …
marcindsobczak Jan 25, 2022
b9065a3
broadcast only if persistent broadcast enabled
marcindsobczak Jan 25, 2022
8d13892
sort and broadcast only tx with lowest nonce of every address
marcindsobczak Jan 26, 2022
b74bb48
ensure there are no stale txs in broadcaster
marcindsobczak Jan 26, 2022
e47b7bd
add xml description
marcindsobczak Jan 27, 2022
aecccb4
fix concurrency issue
marcindsobczak Jan 27, 2022
11799f9
test for removing stale txs from persistent broadcast
marcindsobczak Jan 27, 2022
ca5c4a3
cosmetic
marcindsobczak Jan 27, 2022
9a8be4a
limit max amount of picked txs to actual number of them
marcindsobczak Jan 27, 2022
99ab9c3
simplified broadcaster
marcindsobczak Jan 27, 2022
da671bd
add test for picking best txs from persistent txs
marcindsobczak Jan 27, 2022
ef31c7c
add more test cases
marcindsobczak Jan 27, 2022
77b30d3
cosmetic, add comment
marcindsobczak Jan 27, 2022
3e192bb
Merge remote-tracking branch 'origin/master' into refactor/persistent…
marcindsobczak Jan 27, 2022
3e2fbbe
method names changes
marcindsobczak Jan 31, 2022
4674cfa
refactor getting stale txs algorithm to be thread safe
marcindsobczak Jan 31, 2022
535d95a
one more test
marcindsobczak Jan 31, 2022
106a049
cosmetic
marcindsobczak Jan 31, 2022
2581e41
make test more accurate
marcindsobczak Jan 31, 2022
824bafa
cosmetic
marcindsobczak Jan 31, 2022
9e59a33
drop linq
marcindsobczak Jan 31, 2022
f90e257
small refactor
LukaszRozmej Jan 31, 2022
b85812b
small refactoring
LukaszRozmej Feb 1, 2022
6e413b6
broadcast from persistent txs only when MaxFeePerGas >= CurrentBaseFee
marcindsobczak Feb 1, 2022
7f99c62
add tests
marcindsobczak Feb 1, 2022
30fcfeb
fix test
marcindsobczak Feb 1, 2022
39a1c69
break when all txs picked
marcindsobczak Feb 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) 2022 Demerzel Solutions Limited
// This file is part of the Nethermind library.
//
// The Nethermind library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Nethermind library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Nethermind. If not, see <http://www.gnu.org/licenses/>.
//

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using FluentAssertions;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Comparers;
using Nethermind.Core;
using Nethermind.Core.Extensions;
using Nethermind.Core.Specs;
using Nethermind.Core.Test.Builders;
using Nethermind.Core.Timers;
using Nethermind.Crypto;
using Nethermind.Logging;
using Nethermind.Specs;
using NSubstitute;
using NUnit.Framework;

[assembly: InternalsVisibleTo("Nethermind.Blockchain.Test")]

namespace Nethermind.TxPool.Test;

[TestFixture]
public class TxBroadcasterTests
{
private ILogManager _logManager;
private ISpecProvider _specProvider;
private IBlockTree _blockTree;
private IComparer<Transaction> _comparer;
private TxBroadcaster _broadcaster;
private EthereumEcdsa _ethereumEcdsa;
private TxPoolConfig _txPoolConfig;

[SetUp]
public void Setup()
{
_logManager = LimboLogs.Instance;
_specProvider = RopstenSpecProvider.Instance;
_ethereumEcdsa = new EthereumEcdsa(_specProvider.ChainId, _logManager);
_blockTree = Substitute.For<IBlockTree>();
_comparer = new TransactionComparerProvider(_specProvider, _blockTree).GetDefaultComparer();
_txPoolConfig = new TxPoolConfig();
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_pick_best_persistent_txs_to_broadcast(int threshold)
{
_txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold };
_broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _logManager);

int addedTxsCount = TestItem.PrivateKeys.Length;
Transaction[] transactions = new Transaction[addedTxsCount];

for (int i = 0; i < addedTxsCount; i++)
{
transactions[i] = Build.A.Transaction
.WithSenderAddress(TestItem.PrivateKeys[i].Address)
.WithGasPrice(i.GWei())
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeys[i])
.TestObject;

_broadcaster.StartBroadcast(transactions[i]);
}

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

ITxPoolPeer txPoolPeer = Substitute.For<ITxPoolPeer>();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment<Transaction>.Empty).ToList();

int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount);
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();

for (int i = 1; i <= expectedCount; i++)
{
expectedTxs.Add(transactions[addedTxsCount - i]);
}

expectedTxs.Should().BeEquivalentTo(pickedTxs);
}
}
77 changes: 77 additions & 0 deletions src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,83 @@ public void should_not_broadcast_own_transactions_that_faded_out_and_came_back()
Assert.AreEqual(0, _txPool.GetOwnPendingTransactions().Length);
}

[TestCase(1, 0)]
[TestCase(2, 0)]
[TestCase(2, 1)]
[TestCase(10, 0)]
[TestCase(10, 1)]
[TestCase(10, 5)]
[TestCase(10, 8)]
[TestCase(10, 9)]
public void should_remove_stale_txs_from_persistent_transactions(int numberOfTxs, int nonceIncludedInBlock)
{
_txPool = CreatePool();

Transaction[] transactions = new Transaction[numberOfTxs];
EnsureSenderBalance(TestItem.AddressA, UInt256.MaxValue);

for (int i = 0; i < numberOfTxs; i++)
{
transactions[i] = Build.A.Transaction
.WithNonce((UInt256)i)
.WithGasLimit(GasCostOf.Transaction)
.WithGasPrice(10.GWei())
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA)
.TestObject;
_txPool.SubmitTx(transactions[i], TxHandlingOptions.PersistentBroadcast);
}
_txPool.GetOwnPendingTransactions().Length.Should().Be(numberOfTxs);

Block block = Build.A.Block.WithTransactions(transactions[nonceIncludedInBlock]).TestObject;
BlockReplacementEventArgs blockReplacementEventArgs = new(block, null);

ManualResetEvent manualResetEvent = new(false);
_txPool.RemoveTransaction(Arg.Do<Keccak>(t => manualResetEvent.Set()));
_blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs);
manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200));

// transactions[nonceIncludedInBlock] was included in the block and should be removed, as well as all lower nonces.
_txPool.GetOwnPendingTransactions().Length.Should().Be(numberOfTxs - nonceIncludedInBlock - 1);
}

[Test]
public void broadcaster_should_work_well_when_there_are_no_txs_in_persistent_txs_from_sender_of_tx_included_in_block()
{
_txPool = CreatePool();

Transaction transactionA = Build.A.Transaction
.WithNonce(0)
.WithGasLimit(GasCostOf.Transaction)
.WithGasPrice(10.GWei())
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA)
.TestObject;
EnsureSenderBalance(transactionA);
_txPool.SubmitTx(transactionA, TxHandlingOptions.None);

Transaction transactionB = Build.A.Transaction
.WithNonce(0)
.WithGasLimit(GasCostOf.Transaction)
.WithGasPrice(10.GWei())
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyB)
.TestObject;
EnsureSenderBalance(transactionB);
_txPool.SubmitTx(transactionB, TxHandlingOptions.PersistentBroadcast);

_txPool.GetPendingTransactions().Length.Should().Be(2);
_txPool.GetOwnPendingTransactions().Length.Should().Be(1);

Block block = Build.A.Block.WithTransactions(transactionA).TestObject;
BlockReplacementEventArgs blockReplacementEventArgs = new(block, null);

ManualResetEvent manualResetEvent = new(false);
_txPool.RemoveTransaction(Arg.Do<Keccak>(t => manualResetEvent.Set()));
_blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs);
manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200));

_txPool.GetPendingTransactions().Length.Should().Be(1);
_txPool.GetOwnPendingTransactions().Length.Should().Be(1);
}

[Test]
public async Task should_remove_transactions_concurrently()
{
Expand Down
45 changes: 45 additions & 0 deletions src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,21 @@ public bool TryTakeFirst(out TValue first)
return TryRemove(GetKey(sortedValues.Min), out first);
}

/// <summary>
/// Returns specified number of elements from the start of supplied comparer order.
/// </summary>
[MethodImpl(MethodImplOptions.Synchronized)]
public IEnumerable<TValue> GetFirsts(int numberOfValues)
{
SortedSet<TValue> sortedValues = new(_sortedComparer);
foreach (KeyValuePair<TGroupKey, SortedSet<TValue>> bucket in _buckets)
{
sortedValues.Add(bucket.Value.Max!);
}

return sortedValues.Take(numberOfValues);
}

/// <summary>
/// Gets last element in supplied comparer order.
/// </summary>
Expand Down Expand Up @@ -210,6 +225,36 @@ private bool TryRemove(TKey key, bool evicted, out TValue value, out ICollection
[MethodImpl(MethodImplOptions.Synchronized)]
public bool TryRemove(TKey key) => TryRemove(key, out _, out _);

/// <summary>
/// Tries to get elements matching predicated criteria, iterating through SortedSet with break on first mismatch.
/// </summary>
/// <param name="groupKey">Given GroupKey, which elements are checked.</param>
/// <param name="where">Predicated criteria.</param>
/// <returns>Elements matching predicated criteria.</returns>
[MethodImpl(MethodImplOptions.Synchronized)]
public IEnumerable<TValue> TakeWhile(TGroupKey groupKey, Predicate<TValue> where)
{
if (_buckets.TryGetValue(groupKey, out SortedSet<TValue>? bucket))
{
using SortedSet<TValue>.Enumerator enumerator = bucket!.GetEnumerator();
List<TValue>? list = null;

while (enumerator.MoveNext())
{
if (!where(enumerator.Current))
{
break;
}

list ??= new List<TValue>();
list.Add(enumerator.Current);
}

return list ?? Enumerable.Empty<TValue>();
}
return Enumerable.Empty<TValue>();
}

/// <summary>
/// Tries to get element.
/// </summary>
Expand Down
37 changes: 27 additions & 10 deletions src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Timers;
using Nethermind.Int256;
using Nethermind.Logging;
using Nethermind.TxPool.Collections;

Expand Down Expand Up @@ -111,11 +112,22 @@ public void StopBroadcast(Keccak txHash)
{
if (_persistentTxs.Count != 0)
{
bool wasIncluded = _persistentTxs.TryRemove(txHash, out Transaction _);
if (wasIncluded)
bool hasBeenRemoved = _persistentTxs.TryRemove(txHash, out Transaction _);
if (hasBeenRemoved)
{
if (_logger.IsTrace) _logger.Trace(
$"Transaction {txHash} removed from broadcaster after block inclusion");
$"Transaction {txHash} removed from broadcaster");
}
}
}

public void EnsureStopBroadcastUpToNonce(Address address, UInt256 nonce)
{
if (_persistentTxs.Count != 0)
{
foreach (Transaction tx in _persistentTxs.TakeWhile(address, t => t.Nonce <= nonce))
{
StopBroadcast(tx.Hash!);
}
}
}
Expand All @@ -124,15 +136,13 @@ private void TimerOnElapsed(object sender, EventArgs args)
{
void NotifyPeers()
{
Transaction[] persistentTxs = _persistentTxs.GetSnapshot();

_txsToSend = Interlocked.Exchange(ref _accumulatedTemporaryTxs, _txsToSend);

if (_logger.IsDebug) _logger.Debug($"Broadcasting transactions to all peers");

foreach ((_, ITxPoolPeer peer) in _peers)
{
Notify(peer, GetTxsToSend(peer, persistentTxs, _txsToSend));
Notify(peer, GetTxsToSend(peer, _txsToSend));
}

_txsToSend.Clear();
Expand All @@ -142,13 +152,20 @@ void NotifyPeers()
_timer.Enabled = true;
}

private IEnumerable<Transaction> GetTxsToSend(ITxPoolPeer peer, IReadOnlyList<Transaction> persistentTxs, IEnumerable<Transaction> txsToSend)
internal IEnumerable<Transaction> GetTxsToSend(ITxPoolPeer peer, IEnumerable<Transaction> txsToSend)
{
for (int i = 0; i < persistentTxs.Count; i++)
if (_txPoolConfig.PeerNotificationThreshold > 0)
{
if (_txPoolConfig.PeerNotificationThreshold >= Random.Value.Next(1, 100))
// PeerNotificationThreshold is a declared in config percent of transactions in persistent broadcast,
// which will be sent when timer elapse. numberOfPersistentTxsToBroadcast is equal to
// PeerNotificationThreshold multiplicated by number of transactions in persistent broadcast, rounded up.
int numberOfPersistentTxsToBroadcast =
Math.Min(_txPoolConfig.PeerNotificationThreshold * _persistentTxs.Count / 100 + 1,
_persistentTxs.Count);

foreach (Transaction tx in _persistentTxs.GetFirsts(numberOfPersistentTxsToBroadcast))
{
yield return persistentTxs[i];
yield return tx;
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.TxPool/TxPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ private void RemoveProcessedTransactions(IReadOnlyList<Transaction> blockTransac
{
eip1559Txs++;
}

_broadcaster.EnsureStopBroadcastUpToNonce(blockTransactions[i].SenderAddress!, blockTransactions[i].Nonce);
LukaszRozmej marked this conversation as resolved.
Show resolved Hide resolved
}

if (transactionsInBlock != 0)
Expand Down