Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] MultiSyncModeSelector.Changed event propagation being blocked by SyncFeed #6529

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Core/MeasuredProgress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public void SetMeasuringPoint()
}
}

public bool HasStarted => UtcStartTime.HasValue;

public bool HasEnded => UtcEndTime.HasValue;

public void MarkEnd()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@

using System.Collections.Generic;
using System.Linq;
using System.Threading;
using FluentAssertions;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Receipts;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core;
using Nethermind.Db;
using Nethermind.Int256;
using Nethermind.Logging;
using Nethermind.Specs;
using Nethermind.Synchronization.FastBlocks;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.Synchronization.Peers;
using Nethermind.Synchronization.Reporting;
using NSubstitute;
using NUnit.Framework;

Expand Down Expand Up @@ -640,5 +647,77 @@ public void Switch_correctly_from_full_sync_to_state_nodes_catch_up()

selector.Current.Should().Be(SyncMode.StateNodes);
}

[Test]
public void Changed_event_no_longer_gets_blocked_when_invoking_delegates()
{
ISyncProgressResolver syncProgressResolver = Substitute.For<ISyncProgressResolver>();
syncProgressResolver.FindBestHeader().Returns(Scenario.ChainHead.Number);
syncProgressResolver.FindBestFullBlock().Returns(Scenario.ChainHead.Number);
syncProgressResolver.FindBestFullState().Returns(Scenario.ChainHead.Number - MultiSyncModeSelector.FastSyncLag);
syncProgressResolver.FindBestProcessedBlock().Returns(0);
syncProgressResolver.IsFastBlocksFinished().Returns(FastBlocksState.FinishedReceipts);
syncProgressResolver.ChainDifficulty.Returns(UInt256.Zero);

List<ISyncPeer> syncPeers = new();

BlockHeader header = Scenario.ChainHead;
ISyncPeer syncPeer = Substitute.For<ISyncPeer>();
syncPeer.HeadHash.Returns(header.Hash);
syncPeer.HeadNumber.Returns(header.Number);
syncPeer.TotalDifficulty.Returns(header.TotalDifficulty ?? 0);
syncPeer.IsInitialized.Returns(true);
syncPeer.ClientId.Returns("nethermind");

syncPeers.Add(syncPeer);
ISyncPeerPool syncPeerPool = Substitute.For<ISyncPeerPool>();
IEnumerable<PeerInfo> peerInfos = syncPeers.Select(p => new PeerInfo(p)).ToArray();
syncPeerPool.InitializedPeers.Returns(peerInfos);
syncPeerPool.AllPeers.Returns(peerInfos);

ISyncConfig syncConfig = new SyncConfig
{
FastSyncCatchUpHeightDelta = 2,
FastSync = true,
FastBlocks = true
};

TotalDifficultyBetterPeerStrategy bestPeerStrategy = new(LimboLogs.Instance);
MultiSyncModeSelector selector = new(syncProgressResolver, syncPeerPool, syncConfig, No.BeaconSync, bestPeerStrategy, LimboLogs.Instance);
selector.Stop();
syncProgressResolver.FindBestProcessedBlock().Returns(Scenario.ChainHead.Number);
selector.Update();
selector.Current.Should().Be(SyncMode.Full);

CancellationTokenSource waitTokenSource = new();
ReceiptsSyncFeed receiptsSyncFeed = Substitute.ForPartsOf<ReceiptsSyncFeed>(MainnetSpecProvider.Instance, Substitute.For<IBlockTree>(), Substitute.For<IReceiptStorage>(), syncPeerPool, syncConfig, Substitute.For<ISyncReport>(), Substitute.For<IDb>(), LimboLogs.Instance);
receiptsSyncFeed.When(rsf => rsf.InitializeFeed()).DoNotCallBase();
receiptsSyncFeed.When(rsf => rsf.InitializeFeed()).Do(e =>
{
waitTokenSource.Token.WaitHandle.WaitOne(1000);
if (!waitTokenSource.IsCancellationRequested)
Assert.Fail();
});
selector.Changed += (sender, args) =>
{
receiptsSyncFeed?.SyncModeSelectorOnChanged(SyncMode.FastReceipts);
};
selector.Changed += SecondDelegate;

for (uint i = 0; i < syncConfig.FastSyncCatchUpHeightDelta + 1; i++)
{
long number = header.Number + i;
syncPeer.HeadNumber.Returns(number);
syncPeer.TotalDifficulty.Returns(header.TotalDifficulty!.Value + i);
syncProgressResolver.FindBestHeader().Returns(number);
syncProgressResolver.FindBestFullBlock().Returns(number);
selector.Update();
}
void SecondDelegate(object? sender, SyncModeChangedEventArgs e)
{
waitTokenSource.Cancel();
}
}

}
}
56 changes: 55 additions & 1 deletion src/Nethermind/Nethermind.Synchronization.Test/SyncReportTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Threading.Tasks;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core.Timers;
using Nethermind.Logging;
Expand Down Expand Up @@ -97,6 +96,9 @@ public void Ancient_bodies_and_receipts_are_reported_correctly(bool setBarriers)
}

SyncReport syncReport = new(pool, Substitute.For<INodeStatsManager>(), syncConfig, Substitute.For<IPivot>(), logManager, timerFactory);
syncReport.FastBlocksHeaders.Reset(0);
syncReport.FastBlocksBodies.Reset(0);
syncReport.FastBlocksReceipts.Reset(0);
syncReport.SyncModeSelectorOnChanged(null, new SyncModeChangedEventArgs(SyncMode.None, SyncMode.FastHeaders | SyncMode.FastBodies | SyncMode.FastReceipts));
timer.Elapsed += Raise.Event();

Expand All @@ -113,5 +115,57 @@ public void Ancient_bodies_and_receipts_are_reported_correctly(bool setBarriers)
iLogger.Received(1).Info("Old Receipts 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
}
}

[TestCase(false)]
[TestCase(true)]
public void Ancient_bodies_and_receipts_are_not_reported_until_feed_finishes_Initialization(bool setBarriers)
{
CultureInfo.CurrentCulture = CultureInfo.InvariantCulture;
ISyncPeerPool pool = Substitute.For<ISyncPeerPool>();
pool.InitializedPeersCount.Returns(1);
ITimerFactory timerFactory = Substitute.For<ITimerFactory>();
ITimer timer = Substitute.For<ITimer>();
timerFactory.CreateTimer(Arg.Any<TimeSpan>()).Returns(timer);
ILogManager logManager = Substitute.For<ILogManager>();
InterfaceLogger iLogger = Substitute.For<InterfaceLogger>();
iLogger.IsInfo.Returns(true);
iLogger.IsError.Returns(true);
ILogger logger = new(iLogger);
logManager.GetClassLogger().Returns(logger);

Queue<SyncMode> syncModes = new();
syncModes.Enqueue(SyncMode.FastHeaders);
syncModes.Enqueue(SyncMode.FastBodies);
syncModes.Enqueue(SyncMode.FastReceipts);

SyncConfig syncConfig = new()
{
FastBlocks = true,
FastSync = true,
PivotNumber = "100",
};
if (setBarriers)
{
syncConfig.AncientBodiesBarrier = 30;
syncConfig.AncientReceiptsBarrier = 35;
}

SyncReport syncReport = new(pool, Substitute.For<INodeStatsManager>(), syncConfig, Substitute.For<IPivot>(), logManager, timerFactory);
syncReport.SyncModeSelectorOnChanged(null, new SyncModeChangedEventArgs(SyncMode.None, SyncMode.FastHeaders | SyncMode.FastBodies | SyncMode.FastReceipts));
timer.Elapsed += Raise.Event();

if (setBarriers)
{
iLogger.DidNotReceive().Info("Old Headers 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
iLogger.DidNotReceive().Info("Old Bodies 0 / 70 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
iLogger.DidNotReceive().Info("Old Receipts 0 / 65 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
}
else
{
iLogger.DidNotReceive().Info("Old Headers 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
iLogger.DidNotReceive().Info("Old Bodies 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
iLogger.DidNotReceive().Info("Old Receipts 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@
using Nethermind.Blockchain.Synchronization;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Core.Specs;
using Nethermind.Db;
using Nethermind.Logging;
using Nethermind.State.Proofs;
using Nethermind.Stats.Model;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.Synchronization.Peers;
Expand Down Expand Up @@ -86,6 +83,8 @@ public override void InitializeFeed()
InitializeMetadataDb();
}
base.InitializeFeed();
_syncReport.FastBlocksBodies.Reset(0);
_syncReport.BodiesInQueue.Reset(0);
}

private void ResetSyncStatusList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ public override void InitializeFeed()
}

base.InitializeFeed();
HeadersSyncProgressReport.Reset(0);
HeadersSyncQueueReport.Reset(0);
}

protected virtual void ResetPivot()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public override void InitializeFeed()
InitializeMetadataDb();
}
base.InitializeFeed();
_syncReport.FastBlocksReceipts.Reset(0);
_syncReport.ReceiptsInQueue.Reset(0);
}

private void ResetSyncStatusList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading.Tasks;

namespace Nethermind.Synchronization.ParallelSync
{
Expand All @@ -27,8 +28,11 @@ public override void SyncModeSelectorOnChanged(SyncMode current)
if (_disposed) return;
if (ShouldBeActive(current))
{
InitializeFeed();
Activate();
Task.Run(() =>
{
InitializeFeed();
Activate();
});
}

if (ShouldBeDormant(current))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,19 +293,19 @@ private void WriteFullSyncReport()

private void WriteFastBlocksReport(SyncMode currentSyncMode)
{
if ((currentSyncMode & SyncMode.FastHeaders) == SyncMode.FastHeaders)
if ((currentSyncMode & SyncMode.FastHeaders) == SyncMode.FastHeaders && FastBlocksHeaders.HasStarted)
{
_logger.Info($"Old Headers {Pad(FastBlocksHeaders.CurrentValue, _blockPaddingLength)} / {_paddedPivot} ({FastBlocksHeaders.CurrentValue / (float)(_fastBlocksPivotNumber + 1),8:P2}) | queue {Pad(HeadersInQueue.CurrentValue, SpeedPaddingLength)} | current {Pad(FastBlocksHeaders.CurrentPerSecond, SpeedPaddingLength)} Blk/s | total {Pad(FastBlocksHeaders.TotalPerSecond, SpeedPaddingLength)} Blk/s");
FastBlocksHeaders.SetMeasuringPoint();
}

if ((currentSyncMode & SyncMode.FastBodies) == SyncMode.FastBodies)
if ((currentSyncMode & SyncMode.FastBodies) == SyncMode.FastBodies && FastBlocksBodies.HasStarted)
{
_logger.Info($"Old Bodies {Pad(FastBlocksBodies.CurrentValue, _blockPaddingLength)} / {_paddedAmountOfOldBodiesToDownload} ({FastBlocksBodies.CurrentValue / (float)(_amountOfBodiesToDownload + 1),8:P2}) | queue {Pad(BodiesInQueue.CurrentValue, SpeedPaddingLength)} | current {Pad(FastBlocksBodies.CurrentPerSecond, SpeedPaddingLength)} Blk/s | total {Pad(FastBlocksBodies.TotalPerSecond, SpeedPaddingLength)} Blk/s");
FastBlocksBodies.SetMeasuringPoint();
}

if ((currentSyncMode & SyncMode.FastReceipts) == SyncMode.FastReceipts)
if ((currentSyncMode & SyncMode.FastReceipts) == SyncMode.FastReceipts && FastBlocksReceipts.HasStarted)
{
_logger.Info($"Old Receipts {Pad(FastBlocksReceipts.CurrentValue, _blockPaddingLength)} / {_paddedAmountOfOldReceiptsToDownload} ({FastBlocksReceipts.CurrentValue / (float)(_amountOfReceiptsToDownload + 1),8:P2}) | queue {Pad(ReceiptsInQueue.CurrentValue, SpeedPaddingLength)} | current {Pad(FastBlocksReceipts.CurrentPerSecond, SpeedPaddingLength)} Blk/s | total {Pad(FastBlocksReceipts.TotalPerSecond, SpeedPaddingLength)} Blk/s");
FastBlocksReceipts.SetMeasuringPoint();
Expand Down