Skip to content

Commit

Permalink
Fixes to bad peers handling
Browse files Browse the repository at this point in the history
  • Loading branch information
dceleda committed May 4, 2022
1 parent 09b080c commit c78b3f5
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public override void HandleMessage(ZeroPacket message)

private void LogRequest(IByteBuffer buffer)
{
Logger.Info($"SNAP - GetNodeData{Bytes.ToHexString(buffer.Array)}");
Logger.Info($"SNAP - GetNodeData:{Bytes.ToHexString(buffer.Array)}");
}

private void Handle(AccountRangeMessage msg, long size)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Nethermind.Logging;
using Nethermind.Network.P2P.Subprotocols.Eth.V62;
using Nethermind.Network.P2P.Subprotocols.Eth.V66;
using Nethermind.Network.P2P.Subprotocols.Snap;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.Synchronization.Peers;
using Nethermind.Synchronization.SnapSync;
using NSubstitute;
using NUnit.Framework;

namespace Nethermind.Synchronization.Test.SnapSync
{
[TestFixture]
internal class AnalyzeResponsePerPeerTests
{
[Test]
public void Test01()
{
PeerInfo peer1 = new(null);
PeerInfo peer2 = new(null);

ISyncModeSelector selector = Substitute.For<ISyncModeSelector>();
ISnapProvider snapProvider = Substitute.For<ISnapProvider>();

SnapSyncFeed feed = new(selector, snapProvider, null, LimboLogs.Instance);

feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.ExpiredRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.DifferentRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.ExpiredRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.DifferentRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.DifferentRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);

var result = feed.AnalyzeResponsePerPeer(AddRangeResult.ExpiredRootHash, peer1);

Assert.AreEqual(SyncResponseHandlingResult.LesserQuality, result);

feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);
result = feed.AnalyzeResponsePerPeer(AddRangeResult.DifferentRootHash, peer1);
Assert.AreEqual(SyncResponseHandlingResult.LesserQuality, result);
}

[Test]
public void Test02()
{
PeerInfo peer1 = new(null);
PeerInfo peer2 = new(null);

ISyncModeSelector selector = Substitute.For<ISyncModeSelector>();
ISnapProvider snapProvider = Substitute.For<ISnapProvider>();

SnapSyncFeed feed = new(selector, snapProvider, null, LimboLogs.Instance);

feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.ExpiredRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.DifferentRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.ExpiredRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.DifferentRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.DifferentRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);

var result = feed.AnalyzeResponsePerPeer(AddRangeResult.ExpiredRootHash, peer1);

Assert.AreEqual(SyncResponseHandlingResult.LesserQuality, result);

feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer1);
result = feed.AnalyzeResponsePerPeer(AddRangeResult.DifferentRootHash, peer1);
Assert.AreEqual(SyncResponseHandlingResult.OK, result);
}

[Test]
public void Test03()
{
PeerInfo peer1 = new(null);
PeerInfo peer2 = new(null);

ISyncModeSelector selector = Substitute.For<ISyncModeSelector>();
ISnapProvider snapProvider = Substitute.For<ISnapProvider>();

SnapSyncFeed feed = new(selector, snapProvider, null, LimboLogs.Instance);

feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.ExpiredRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.ExpiredRootHash, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.ExpiredRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.DifferentRootHash, peer2);
feed.AnalyzeResponsePerPeer(AddRangeResult.ExpiredRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.ExpiredRootHash, peer1);
feed.AnalyzeResponsePerPeer(AddRangeResult.ExpiredRootHash, peer1);
var result = feed.AnalyzeResponsePerPeer(AddRangeResult.DifferentRootHash, peer1);
Assert.AreEqual(SyncResponseHandlingResult.OK, result);

snapProvider.Received(1).UpdatePivot();
}

[Test]
public void Test04()
{
PeerInfo peer1 = new(null);

ISyncModeSelector selector = Substitute.For<ISyncModeSelector>();
ISnapProvider snapProvider = Substitute.For<ISnapProvider>();

SnapSyncFeed feed = new(selector, snapProvider, null, LimboLogs.Instance);

for (int i = 0; i < 200; i++)
{
feed.AnalyzeResponsePerPeer(AddRangeResult.OK, peer1);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Pivot(IBlockTree blockTree, ILogManager logManager)

public BlockHeader GetPivotHeader()
{
if(_bestHeader is null || _blockTree.BestSuggestedHeader?.Number - _bestHeader.Number >= Constants.MaxDistanceFromHead - 20)
if(_bestHeader is null || _blockTree.BestSuggestedHeader?.Number - _bestHeader.Number >= Constants.MaxDistanceFromHead - 35)
{
LogPivotChanged($"distance from HEAD:{Diff}");
_bestHeader = _blockTree.BestSuggestedHeader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class ProgressTracker
private readonly IDb _db;

public Keccak NextAccountPath { get; set; } = Keccak.Zero;
//public Keccak NextAccountPath { get; set; } = new("0xffe0000000000000000000000000000000000000000000000000000000000000");
private ConcurrentQueue<StorageRange> NextSlotRange { get; set; } = new();
private ConcurrentQueue<PathWithAccount> StoragesToRetrieve { get; set; } = new();
private ConcurrentQueue<Keccak> CodesToRetrieve { get; set; } = new();
Expand All @@ -48,6 +47,8 @@ public ProgressTracker(IBlockTree blockTree, IDb db, ILogManager logManager)

_pivot = new Pivot(blockTree, logManager);

_logger.Info($"SNAP - batch sizes - storage:{STORAGE_BATCH_SIZE}, codes:{CODES_BATCH_SIZE}");

//TODO: maybe better to move to a init methot instead of the constructor
GetSyncProgress();
}
Expand Down
80 changes: 49 additions & 31 deletions src/Nethermind/Nethermind.Synchronization/SnapSync/SnapSyncFeed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
using Nethermind.Synchronization.Peers;
using Nethermind.Core.Crypto;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;

namespace Nethermind.Synchronization.SnapSync
{
public class SnapSyncFeed : SyncFeed<SnapSyncBatch?>, IDisposable
{
private object _syncLock = new object();

private const int AllowedInvalidResponses = 5;
private ConcurrentQueue<(PeerInfo peer, AddRangeResult result)> _resultLog = new();
private LinkedList<(PeerInfo peer, AddRangeResult result)> _resultLog = new();

private const SnapSyncBatch EmptyBatch = null;

Expand All @@ -49,9 +52,9 @@ public class SnapSyncFeed : SyncFeed<SnapSyncBatch?>, IDisposable

public SnapSyncFeed(ISyncModeSelector syncModeSelector, ISnapProvider snapProvider, IBlockTree blockTree, ILogManager logManager)
{
_syncModeSelector = syncModeSelector ?? throw new ArgumentNullException(nameof(syncModeSelector));
_snapProvider = snapProvider ?? throw new ArgumentNullException(nameof(snapProvider)); ;
_logger = logManager.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));
_syncModeSelector = syncModeSelector;
_snapProvider = snapProvider;
_logger = logManager.GetClassLogger();

_syncModeSelector.Changed += SyncModeSelectorOnChanged;
}
Expand Down Expand Up @@ -129,15 +132,15 @@ public override SyncResponseHandlingResult HandleResponse(SnapSyncBatch? batch,
}
else
{
_logger.Warn($"SNAP - timeout? {peer.SyncPeer.Id}");
_logger.Warn($"SNAP - timeout? {peer}");
return SyncResponseHandlingResult.LesserQuality;
}
}

return AnalyzeResponsePerPeer(result, peer);
}

private SyncResponseHandlingResult AnalyzeResponsePerPeer(AddRangeResult result, PeerInfo peer)
public SyncResponseHandlingResult AnalyzeResponsePerPeer(AddRangeResult result, PeerInfo peer)
{
if(peer == null)
{
Expand All @@ -147,52 +150,67 @@ private SyncResponseHandlingResult AnalyzeResponsePerPeer(AddRangeResult result,
int maxSize = 10 * AllowedInvalidResponses;
while (_resultLog.Count > maxSize)
{
_resultLog.TryDequeue(out _);
lock (_syncLock)
{
if (_resultLog.Count > 0)
{
_resultLog.RemoveLast();
}
}
}

_resultLog.Enqueue((peer, result));
lock (_syncLock)
{
_resultLog.AddFirst((peer, result));
}

if (result == AddRangeResult.OK)
{
_resultLog.Enqueue((peer, AddRangeResult.OK));
return SyncResponseHandlingResult.OK;
}
else
{
int allLastSuccess = 0;
int allLastFailures = 0;
int peerLastFailures = 0;
foreach (var item in _resultLog)

lock(_syncLock)
{
if(item.result == AddRangeResult.OK)
foreach (var item in _resultLog)
{
allLastSuccess++;

if (item.peer == peer)
if (item.result == AddRangeResult.OK)
{
break;
}
}
else
{
allLastFailures++;
allLastSuccess++;

if (item.peer == peer)
if (item.peer == peer)
{
break;
}
}
else
{
peerLastFailures++;
allLastFailures++;

if(peerLastFailures > AllowedInvalidResponses)
if (item.peer == peer)
{
if(allLastFailures == peerLastFailures)
{
_logger.Warn($"SNAP - peer to be punished:{peer.SyncPeer.Id}");
return SyncResponseHandlingResult.LesserQuality;
}
peerLastFailures++;

if(allLastSuccess == 0 && allLastFailures > peerLastFailures)
if (peerLastFailures > AllowedInvalidResponses)
{
_snapProvider.UpdatePivot();
_resultLog.Clear();
if (allLastFailures == peerLastFailures)
{
_logger.Warn($"SNAP - peer to be punished:{peer}");
return SyncResponseHandlingResult.LesserQuality;
}

if (allLastSuccess == 0 && allLastFailures > peerLastFailures)
{
_snapProvider.UpdatePivot();

_resultLog.Clear();

break;
}
}
}
}
Expand Down

0 comments on commit c78b3f5

Please sign in to comment.