Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast best persistent txs instead of random #3768

Merged
merged 27 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
950c024
broadcast specified % of best txs in persistent broadcast instead of …
marcindsobczak Jan 25, 2022
b9065a3
broadcast only if persistent broadcast enabled
marcindsobczak Jan 25, 2022
8d13892
sort and broadcast only tx with lowest nonce of every address
marcindsobczak Jan 26, 2022
b74bb48
ensure there are no stale txs in broadcaster
marcindsobczak Jan 26, 2022
e47b7bd
add xml description
marcindsobczak Jan 27, 2022
aecccb4
fix concurrency issue
marcindsobczak Jan 27, 2022
11799f9
test for removing stale txs from persistent broadcast
marcindsobczak Jan 27, 2022
ca5c4a3
cosmetic
marcindsobczak Jan 27, 2022
9a8be4a
limit max amount of picked txs to actual number of them
marcindsobczak Jan 27, 2022
99ab9c3
simplified broadcaster
marcindsobczak Jan 27, 2022
da671bd
add test for picking best txs from persistent txs
marcindsobczak Jan 27, 2022
ef31c7c
add more test cases
marcindsobczak Jan 27, 2022
77b30d3
cosmetic, add comment
marcindsobczak Jan 27, 2022
3e192bb
Merge remote-tracking branch 'origin/master' into refactor/persistent…
marcindsobczak Jan 27, 2022
3e2fbbe
method names changes
marcindsobczak Jan 31, 2022
4674cfa
refactor getting stale txs algorithm to be thread safe
marcindsobczak Jan 31, 2022
535d95a
one more test
marcindsobczak Jan 31, 2022
106a049
cosmetic
marcindsobczak Jan 31, 2022
2581e41
make test more accurate
marcindsobczak Jan 31, 2022
824bafa
cosmetic
marcindsobczak Jan 31, 2022
9e59a33
drop linq
marcindsobczak Jan 31, 2022
f90e257
small refactor
LukaszRozmej Jan 31, 2022
b85812b
small refactoring
LukaszRozmej Feb 1, 2022
6e413b6
broadcast from persistent txs only when MaxFeePerGas >= CurrentBaseFee
marcindsobczak Feb 1, 2022
7f99c62
add tests
marcindsobczak Feb 1, 2022
30fcfeb
fix test
marcindsobczak Feb 1, 2022
39a1c69
break when all txs picked
marcindsobczak Feb 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 213 additions & 0 deletions src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright (c) 2022 Demerzel Solutions Limited
// This file is part of the Nethermind library.
//
// The Nethermind library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Nethermind library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Nethermind. If not, see <http://www.gnu.org/licenses/>.
//

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using FluentAssertions;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Comparers;
using Nethermind.Core;
using Nethermind.Core.Extensions;
using Nethermind.Core.Specs;
using Nethermind.Core.Test.Builders;
using Nethermind.Core.Timers;
using Nethermind.Crypto;
using Nethermind.Logging;
using Nethermind.Specs;
using NSubstitute;
using NUnit.Framework;

[assembly: InternalsVisibleTo("Nethermind.Blockchain.Test")]

namespace Nethermind.TxPool.Test;

[TestFixture]
public class TxBroadcasterTests
{
private ILogManager _logManager;
private ISpecProvider _specProvider;
private IBlockTree _blockTree;
private IComparer<Transaction> _comparer;
private TxBroadcaster _broadcaster;
private EthereumEcdsa _ethereumEcdsa;
private TxPoolConfig _txPoolConfig;
private IChainHeadInfoProvider _headInfo;

[SetUp]
public void Setup()
{
_logManager = LimboLogs.Instance;
_specProvider = RopstenSpecProvider.Instance;
_ethereumEcdsa = new EthereumEcdsa(_specProvider.ChainId, _logManager);
_blockTree = Substitute.For<IBlockTree>();
_comparer = new TransactionComparerProvider(_specProvider, _blockTree).GetDefaultComparer();
_txPoolConfig = new TxPoolConfig();
_headInfo = Substitute.For<IChainHeadInfoProvider>();
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_pick_best_persistent_txs_to_broadcast(int threshold)
{
_txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold };
_broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager);
_headInfo.CurrentBaseFee.Returns(0.GWei());

int addedTxsCount = TestItem.PrivateKeys.Length;
Transaction[] transactions = new Transaction[addedTxsCount];

for (int i = 0; i < addedTxsCount; i++)
{
transactions[i] = Build.A.Transaction
.WithGasPrice(i.GWei())
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeys[i])
.TestObject;

_broadcaster.StartBroadcast(transactions[i]);
}

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

ITxPoolPeer txPoolPeer = Substitute.For<ITxPoolPeer>();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment<Transaction>.Empty).ToList();

int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount);
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();

for (int i = 1; i <= expectedCount; i++)
{
expectedTxs.Add(transactions[addedTxsCount - i]);
}

expectedTxs.Should().BeEquivalentTo(pickedTxs);
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int threshold)
{
_txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold };
_broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager);

const int currentBaseFeeInGwei = 250;
_headInfo.CurrentBaseFee.Returns(currentBaseFeeInGwei.GWei());
Block headBlock = Build.A.Block
.WithNumber(RopstenSpecProvider.LondonBlockNumber)
.WithBaseFeePerGas(currentBaseFeeInGwei.GWei())
.TestObject;
_blockTree.Head.Returns(headBlock);

int addedTxsCount = TestItem.PrivateKeys.Length;
Transaction[] transactions = new Transaction[addedTxsCount];

for (int i = 0; i < addedTxsCount; i++)
{
transactions[i] = Build.A.Transaction
.WithGasPrice(i.GWei())
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeys[i])
.TestObject;

_broadcaster.StartBroadcast(transactions[i]);
}

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

ITxPoolPeer txPoolPeer = Substitute.For<ITxPoolPeer>();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment<Transaction>.Empty).ToList();

int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei);
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();

for (int i = 1; i <= expectedCount; i++)
{
expectedTxs.Add(transactions[addedTxsCount - i]);
}

expectedTxs.Should().BeEquivalentTo(pickedTxs);
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_not_pick_1559_txs_with_MaxFeePerGas_lower_than_CurrentBaseFee(int threshold)
{
_txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold };
_broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager);

const int currentBaseFeeInGwei = 250;
_headInfo.CurrentBaseFee.Returns(currentBaseFeeInGwei.GWei());
Block headBlock = Build.A.Block
.WithNumber(RopstenSpecProvider.LondonBlockNumber)
.WithBaseFeePerGas(currentBaseFeeInGwei.GWei())
.TestObject;
_blockTree.Head.Returns(headBlock);

int addedTxsCount = TestItem.PrivateKeys.Length;
Transaction[] transactions = new Transaction[addedTxsCount];

for (int i = 0; i < addedTxsCount; i++)
{
transactions[i] = Build.A.Transaction
.WithType(TxType.EIP1559)
.WithMaxFeePerGas(i.GWei())
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeys[i])
.TestObject;

_broadcaster.StartBroadcast(transactions[i]);
}

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

ITxPoolPeer txPoolPeer = Substitute.For<ITxPoolPeer>();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment<Transaction>.Empty).ToList();

int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei);
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();

for (int i = 1; i <= expectedCount; i++)
{
expectedTxs.Add(transactions[addedTxsCount - i]);
}

expectedTxs.Should().BeEquivalentTo(pickedTxs, o => o.Excluding(transaction => transaction.MaxFeePerGas));
}
}
77 changes: 77 additions & 0 deletions src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,83 @@ public void should_not_broadcast_own_transactions_that_faded_out_and_came_back()
Assert.AreEqual(0, _txPool.GetOwnPendingTransactions().Length);
}

[TestCase(1, 0)]
[TestCase(2, 0)]
[TestCase(2, 1)]
[TestCase(10, 0)]
[TestCase(10, 1)]
[TestCase(10, 5)]
[TestCase(10, 8)]
[TestCase(10, 9)]
public void should_remove_stale_txs_from_persistent_transactions(int numberOfTxs, int nonceIncludedInBlock)
{
_txPool = CreatePool();

Transaction[] transactions = new Transaction[numberOfTxs];
EnsureSenderBalance(TestItem.AddressA, UInt256.MaxValue);

for (int i = 0; i < numberOfTxs; i++)
{
transactions[i] = Build.A.Transaction
.WithNonce((UInt256)i)
.WithGasLimit(GasCostOf.Transaction)
.WithGasPrice(10.GWei())
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA)
.TestObject;
_txPool.SubmitTx(transactions[i], TxHandlingOptions.PersistentBroadcast);
}
_txPool.GetOwnPendingTransactions().Length.Should().Be(numberOfTxs);

Block block = Build.A.Block.WithTransactions(transactions[nonceIncludedInBlock]).TestObject;
BlockReplacementEventArgs blockReplacementEventArgs = new(block, null);

ManualResetEvent manualResetEvent = new(false);
_txPool.RemoveTransaction(Arg.Do<Keccak>(t => manualResetEvent.Set()));
_blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs);
manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200));

// transactions[nonceIncludedInBlock] was included in the block and should be removed, as well as all lower nonces.
_txPool.GetOwnPendingTransactions().Length.Should().Be(numberOfTxs - nonceIncludedInBlock - 1);
}

[Test]
public void broadcaster_should_work_well_when_there_are_no_txs_in_persistent_txs_from_sender_of_tx_included_in_block()
{
_txPool = CreatePool();

Transaction transactionA = Build.A.Transaction
.WithNonce(0)
.WithGasLimit(GasCostOf.Transaction)
.WithGasPrice(10.GWei())
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA)
.TestObject;
EnsureSenderBalance(transactionA);
_txPool.SubmitTx(transactionA, TxHandlingOptions.None);

Transaction transactionB = Build.A.Transaction
.WithNonce(0)
.WithGasLimit(GasCostOf.Transaction)
.WithGasPrice(10.GWei())
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyB)
.TestObject;
EnsureSenderBalance(transactionB);
_txPool.SubmitTx(transactionB, TxHandlingOptions.PersistentBroadcast);

_txPool.GetPendingTransactions().Length.Should().Be(2);
_txPool.GetOwnPendingTransactions().Length.Should().Be(1);

Block block = Build.A.Block.WithTransactions(transactionA).TestObject;
BlockReplacementEventArgs blockReplacementEventArgs = new(block, null);

ManualResetEvent manualResetEvent = new(false);
_txPool.RemoveTransaction(Arg.Do<Keccak>(t => manualResetEvent.Set()));
_blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs);
manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200));

_txPool.GetPendingTransactions().Length.Should().Be(1);
_txPool.GetOwnPendingTransactions().Length.Should().Be(1);
}

[Test]
public async Task should_remove_transactions_concurrently()
{
Expand Down
45 changes: 45 additions & 0 deletions src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,21 @@ public bool TryTakeFirst(out TValue first)
return TryRemove(GetKey(sortedValues.Min), out first);
}

/// <summary>
/// Returns best element of each bucket in supplied comparer order.
/// </summary>
[MethodImpl(MethodImplOptions.Synchronized)]
public IEnumerable<TValue> GetFirsts()
{
SortedSet<TValue> sortedValues = new(_sortedComparer);
foreach (KeyValuePair<TGroupKey, SortedSet<TValue>> bucket in _buckets)
{
sortedValues.Add(bucket.Value.Max!);
}

return sortedValues;
}

/// <summary>
/// Gets last element in supplied comparer order.
/// </summary>
Expand Down Expand Up @@ -210,6 +225,36 @@ private bool TryRemove(TKey key, bool evicted, out TValue value, out ICollection
[MethodImpl(MethodImplOptions.Synchronized)]
public bool TryRemove(TKey key) => TryRemove(key, out _, out _);

/// <summary>
/// Tries to get elements matching predicated criteria, iterating through SortedSet with break on first mismatch.
/// </summary>
/// <param name="groupKey">Given GroupKey, which elements are checked.</param>
/// <param name="where">Predicated criteria.</param>
/// <returns>Elements matching predicated criteria.</returns>
[MethodImpl(MethodImplOptions.Synchronized)]
public IEnumerable<TValue> TakeWhile(TGroupKey groupKey, Predicate<TValue> where)
{
if (_buckets.TryGetValue(groupKey, out SortedSet<TValue>? bucket))
{
using SortedSet<TValue>.Enumerator enumerator = bucket!.GetEnumerator();
List<TValue>? list = null;

while (enumerator.MoveNext())
{
if (!where(enumerator.Current))
{
break;
}

list ??= new List<TValue>();
list.Add(enumerator.Current);
}

return list ?? Enumerable.Empty<TValue>();
}
return Enumerable.Empty<TValue>();
}

/// <summary>
/// Tries to get element.
/// </summary>
Expand Down
Loading