Skip to content

Commit

Permalink
TxPool autorecover (#6789)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarekM25 authored Feb 28, 2024
1 parent 263f66c commit 84ff7e8
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 62 deletions.
3 changes: 1 addition & 2 deletions src/Nethermind/Nethermind.Runner.Test/ConfigFilesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,9 @@ public void Metrics_disabled_by_default(string configWildcard)
Test<IMetricsConfig, string>(configWildcard, c => c.PushGatewayUrl, "");
}

[TestCase("^mainnet ^spaceneth ^volta", 50)]
[TestCase("^spaceneth ^volta", 50)]
[TestCase("spaceneth", 4)]
[TestCase("volta", 25)]
[TestCase("mainnet", 50)]
public void Network_defaults_are_correct(string configWildcard, int activePeers = 50)
{
Test<INetworkConfig, int>(configWildcard, c => c.DiscoveryPort, 30303);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,19 @@ public int GetHashCode(WithFinalizer obj)
}
}

private class ShrinkableDistinctPool : WithFinalizerDistinctPool
{
public ShrinkableDistinctPool(int capacity, IComparer<WithFinalizer> comparer, IEqualityComparer<WithFinalizer> distinctComparer, ILogManager logManager)
: base(capacity, comparer, distinctComparer, logManager)
{
}

public void Shrink(int capacity)
{
EnsureCapacity(capacity);
}
}

private class WithFinalizerDistinctPool : DistinctValueSortedPool<int, WithFinalizer, int>
{
public WithFinalizerDistinctPool(int capacity, IComparer<WithFinalizer> comparer, IEqualityComparer<WithFinalizer> distinctComparer, ILogManager logManager)
Expand All @@ -155,23 +168,23 @@ public WithFinalizerDistinctPool(int capacity, IComparer<WithFinalizer> comparer
protected override int GetKey(WithFinalizer value) => value.Index;
}

[Test]
public void Capacity_is_never_exceeded()
IComparer<WithFinalizer> _comparer = Comparer<WithFinalizer>.Create((t1, t2) =>
{
IComparer<WithFinalizer> comparer = Comparer<WithFinalizer>.Create((t1, t2) =>
{
int t1Oddity = t1.Index % 2;
int t2Oddity = t2.Index % 2;
int t1Oddity = t1.Index % 2;
int t2Oddity = t2.Index % 2;
if (t1Oddity.CompareTo(t2Oddity) != 0)
{
return t1Oddity.CompareTo(t2Oddity);
}
if (t1Oddity.CompareTo(t2Oddity) != 0)
{
return t1Oddity.CompareTo(t2Oddity);
}
return t1.Index.CompareTo(t2.Index);
});
return t1.Index.CompareTo(t2.Index);
});

var pool = new WithFinalizerDistinctPool(Capacity, comparer, new WithFinalizerComparer(), LimboLogs.Instance);
[Test]
public void Capacity_is_never_exceeded()
{
var pool = new WithFinalizerDistinctPool(Capacity, _comparer, new WithFinalizerComparer(), LimboLogs.Instance);

int capacityMultiplier = 10;
int expectedAllCount = Capacity * capacityMultiplier;
Expand All @@ -192,23 +205,39 @@ public void Capacity_is_never_exceeded()
pool.Count.Should().Be(Capacity);
}

[Test]
public void Capacity_is_never_exceeded_when_there_are_duplicates()
[TestCase(0, 16)]
[TestCase(1, 15)]
[TestCase(2, 14)]
[TestCase(6, 10)]
[TestCase(10, 6)]
[TestCase(11, 6)]
[TestCase(13, 6)]
public void Capacity_can_shrink_to_given_value(int shrinkValue, int expectedCapacity)
{
Comparer<WithFinalizer> comparer = Comparer<WithFinalizer>.Create((t1, t2) =>
var pool = new ShrinkableDistinctPool(Capacity, _comparer, new WithFinalizerComparer(), LimboLogs.Instance);

int capacityMultiplier = 10;
int expectedAllCount = Capacity * capacityMultiplier;

WithFinalizer newOne;
for (int i = 0; i < expectedAllCount; i++)
{
int t1Oddity = t1.Index % 2;
int t2Oddity = t2.Index % 2;
newOne = new WithFinalizer();
pool.TryInsert(newOne.Index, newOne);
}

if (t1Oddity.CompareTo(t2Oddity) != 0)
{
return t1Oddity.CompareTo(t2Oddity);
}
newOne = null;

CollectAndFinalize();

return t1.Index.CompareTo(t2.Index);
});
pool.Shrink(Capacity - shrinkValue);
pool.Count.Should().Be(expectedCapacity);
}

var pool = new WithFinalizerDistinctPool(Capacity, comparer, new WithFinalizerComparer(), LimboLogs.Instance);
[Test]
public void Capacity_is_never_exceeded_when_there_are_duplicates()
{
var pool = new WithFinalizerDistinctPool(Capacity, _comparer, new WithFinalizerComparer(), LimboLogs.Instance);

int capacityMultiplier = 10;

Expand All @@ -232,20 +261,7 @@ public async Task Capacity_is_never_exceeded_with_multiple_threads()
_finalizedCount.Should().Be(0);
_allCount.Should().Be(0);

IComparer<WithFinalizer> comparer = Comparer<WithFinalizer>.Create((t1, t2) =>
{
int t1Oddity = t1.Index % 2;
int t2Oddity = t2.Index % 2;
if (t1Oddity.CompareTo(t2Oddity) != 0)
{
return t1Oddity.CompareTo(t2Oddity);
}
return t1.Index.CompareTo(t2.Index);
});

var pool = new WithFinalizerDistinctPool(Capacity, comparer, new WithFinalizerComparer(), LimboLogs.Instance);
var pool = new WithFinalizerDistinctPool(Capacity, _comparer, new WithFinalizerComparer(), LimboLogs.Instance);

void KeepGoing(int iterations)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ public BlobTxDistinctSortedPool(int capacity, IComparer<Transaction> comparer, I
protected override IComparer<Transaction> GetReplacementComparer(IComparer<Transaction> comparer)
=> comparer.GetBlobReplacementComparer();

public override void VerifyCapacity()
{
if (_logger.IsWarn && Count > _poolCapacity)
_logger.Warn($"Blob pool exceeds the config size {Count}/{_poolCapacity}");
}
protected override string ShortPoolName => "BlobPool";

/// <summary>
/// For tests only - to test sorting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected DistinctValueSortedPool(
IComparer<TValue> comparer,
IEqualityComparer<TValue> distinctComparer,
ILogManager logManager)
: base(capacity, comparer)
: base(capacity, comparer, logManager)
{
// ReSharper disable once VirtualMemberCallInConstructor
_comparer = GetReplacementComparer(comparer ?? throw new ArgumentNullException(nameof(comparer)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,4 @@ protected override bool Remove(ValueHash256 hash, Transaction tx)
_blobTxStorage.Delete(hash, tx.Timestamp);
return base.Remove(hash, tx);
}

public override void VerifyCapacity()
{
base.VerifyCapacity();

if (_logger.IsDebug && Count == _poolCapacity)
_logger.Debug($"Blob persistent storage has reached max size of {_poolCapacity}, blob txs can be evicted now");
}
}
49 changes: 48 additions & 1 deletion src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Runtime.CompilerServices;
using Nethermind.Core.Collections;
using Nethermind.Core.Threading;
using Nethermind.Logging;

namespace Nethermind.TxPool.Collections
{
Expand All @@ -25,6 +26,7 @@ public abstract partial class SortedPool<TKey, TValue, TGroupKey>
protected McsPriorityLock Lock { get; } = new();

private readonly int _capacity;
private readonly ILogger _logger;

// comparer for a bucket
private readonly IComparer<TValue> _groupComparer;
Expand All @@ -48,7 +50,7 @@ public abstract partial class SortedPool<TKey, TValue, TGroupKey>
/// </summary>
/// <param name="capacity">Max capacity, after surpassing it elements will be removed based on last by <see cref="comparer"/>.</param>
/// <param name="comparer">Comparer to sort items.</param>
protected SortedPool(int capacity, IComparer<TValue> comparer)
protected SortedPool(int capacity, IComparer<TValue> comparer, ILogManager logManager)
{
_capacity = capacity;
// ReSharper disable VirtualMemberCallInConstructor
Expand All @@ -57,6 +59,7 @@ protected SortedPool(int capacity, IComparer<TValue> comparer)
_cacheMap = new Dictionary<TKey, TValue>(); // do not initialize it at the full capacity
_buckets = new Dictionary<TGroupKey, EnhancedSortedSet<TValue>>();
_worstSortedValues = new DictionarySortedSet<TValue, TKey>(_sortedComparer);
_logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));
}

/// <summary>
Expand Down Expand Up @@ -312,6 +315,8 @@ public virtual bool TryInsert(TKey key, TValue value, out TValue? removed)
{
if (!RemoveLast(out removed) || _cacheMap.Count > _capacity)
{
if (_cacheMap.Count > _capacity && _logger.IsWarn)
_logger.Warn($"Capacity exceeded or failed to remove the last item from the pool, the current state is {Count}/{_capacity}. {GetInfoAboutWorstValues}");
UpdateWorstValue();
RemoveLast(out removed);
}
Expand Down Expand Up @@ -453,6 +458,46 @@ public bool TryGetBucketsWorstValue(TGroupKey groupKey, out TValue? item)
return false;
}

protected void EnsureCapacity(int? expectedCapacity = null)
{
expectedCapacity ??= _capacity; // expectedCapacity is added for testing purpose. null should be used in production code
if (Count <= expectedCapacity)
return;

if (_logger.IsWarn)
_logger.Warn($"{ShortPoolName} exceeds the config size {Count}/{expectedCapacity}. Trying to repair the pool");

// Trying to auto-recover TxPool. If this code is executed, it means that something is wrong with our TxPool logic.
// However, auto-recover mitigates bad consequences of such bugs.
int maxIterations = 10; // We don't want to add an infinite loop, so we can break after a few iterations.
int iterations = 0;
while (Count > expectedCapacity)
{
++iterations;
if (RemoveLast(out TValue? removed))
{
if (_logger.IsInfo)
_logger.Info($"Removed the last item {removed} from the pool, the current state is {Count}/{expectedCapacity}. {GetInfoAboutWorstValues}");
}
else
{
if (_logger.IsWarn)
_logger.Warn($"Failed to remove the last item from the pool, the current state is {Count}/{expectedCapacity}. {GetInfoAboutWorstValues}");
UpdateWorstValue();
}

if (iterations >= maxIterations) break;
}
}

private string GetInfoAboutWorstValues()
{
TKey? key = _worstValue.GetValueOrDefault().Value;
var isWorstValueInPool = _cacheMap.TryGetValue(key, out TValue? value) && value != null;
return
$"Worst value {_worstValue}, items in worstSortedValues {_worstSortedValues.Count}, GetVakye: {_worstValue.GetValueOrDefault()} current max in worst values {_worstSortedValues.Max}, IsWorstValueInPool {isWorstValueInPool}";
}

public void UpdatePool(Func<TGroupKey, IReadOnlySortedSet<TValue>, IEnumerable<(TValue Tx, Action<TValue>? Change)>> changingElements)
{
using var lockRelease = Lock.Acquire();
Expand Down Expand Up @@ -485,5 +530,7 @@ protected virtual void UpdateGroup(TGroupKey groupKey, EnhancedSortedSet<TValue>
change?.Invoke(value);
}
}

protected virtual string ShortPoolName => "SortedPool";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void UpdatePool(IAccountStateProvider accounts, Func<Address, AccountStru
{
using var lockRelease = Lock.Acquire();

VerifyCapacity();
EnsureCapacity();
foreach ((AddressAsKey address, EnhancedSortedSet<Transaction> bucket) in _buckets)
{
Debug.Assert(bucket.Count > 0);
Expand Down Expand Up @@ -133,10 +133,6 @@ public void UpdateGroup(Address groupKey, AccountStruct groupValue, Func<Address
}
}

public virtual void VerifyCapacity()
{
if (_logger.IsWarn && Count > _poolCapacity)
_logger.Warn($"TxPool exceeds the config size {Count}/{_poolCapacity}");
}
protected override string ShortPoolName => "TxPool";
}
}

0 comments on commit 84ff7e8

Please sign in to comment.