diff --git a/src/Nethermind/Nethermind.Core/MeasuredProgress.cs b/src/Nethermind/Nethermind.Core/MeasuredProgress.cs index 5b4f2531105..a493360a2a1 100644 --- a/src/Nethermind/Nethermind.Core/MeasuredProgress.cs +++ b/src/Nethermind/Nethermind.Core/MeasuredProgress.cs @@ -38,6 +38,8 @@ public void SetMeasuringPoint() } } + public bool HasStarted => UtcStartTime.HasValue; + public bool HasEnded => UtcEndTime.HasValue; public void MarkEnd() diff --git a/src/Nethermind/Nethermind.Synchronization.Test/ParallelSync/MultiSyncModeSelectorFastSyncTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/ParallelSync/MultiSyncModeSelectorFastSyncTests.cs index 88c68959d84..e492ec6b0b2 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/ParallelSync/MultiSyncModeSelectorFastSyncTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/ParallelSync/MultiSyncModeSelectorFastSyncTests.cs @@ -5,12 +5,18 @@ using System.Linq; using System.Threading; using FluentAssertions; +using Nethermind.Blockchain; +using Nethermind.Blockchain.Receipts; using Nethermind.Blockchain.Synchronization; using Nethermind.Core; +using Nethermind.Db; using Nethermind.Int256; using Nethermind.Logging; +using Nethermind.Specs; +using Nethermind.Synchronization.FastBlocks; using Nethermind.Synchronization.ParallelSync; using Nethermind.Synchronization.Peers; +using Nethermind.Synchronization.Reporting; using NSubstitute; using NUnit.Framework; @@ -669,8 +675,12 @@ public void Changed_event_no_longer_gets_blocked_when_invoking_delegates() syncPeerPool.InitializedPeers.Returns(peerInfos); syncPeerPool.AllPeers.Returns(peerInfos); - ISyncConfig syncConfig = new SyncConfig() { FastSyncCatchUpHeightDelta = 2 }; - syncConfig.FastSync = true; + ISyncConfig syncConfig = new SyncConfig + { + FastSyncCatchUpHeightDelta = 2, + FastSync = true, + FastBlocks = true + }; TotalDifficultyBetterPeerStrategy bestPeerStrategy = new(LimboLogs.Instance); MultiSyncModeSelector selector = new(syncProgressResolver, syncPeerPool, syncConfig, No.BeaconSync, bestPeerStrategy, LimboLogs.Instance); @@ -679,9 +689,20 @@ public void Changed_event_no_longer_gets_blocked_when_invoking_delegates() selector.Update(); selector.Current.Should().Be(SyncMode.Full); - CancellationTokenSource waitTokenSource = new CancellationTokenSource(); - selector.Changed += BlockingMethod; - selector.Changed += TestMethod; + CancellationTokenSource waitTokenSource = new(); + ReceiptsSyncFeed receiptsSyncFeed = Substitute.ForPartsOf(MainnetSpecProvider.Instance, Substitute.For(), Substitute.For(), syncPeerPool, syncConfig, Substitute.For(), Substitute.For(), LimboLogs.Instance); + receiptsSyncFeed.When(rsf => rsf.InitializeFeed()).DoNotCallBase(); + receiptsSyncFeed.When(rsf => rsf.InitializeFeed()).Do(e => + { + waitTokenSource.Token.WaitHandle.WaitOne(1000); + if (!waitTokenSource.IsCancellationRequested) + Assert.Fail(); + }); + selector.Changed += (sender, args) => + { + receiptsSyncFeed?.SyncModeSelectorOnChanged(SyncMode.FastReceipts); + }; + selector.Changed += SecondDelegate; for (uint i = 0; i < syncConfig.FastSyncCatchUpHeightDelta + 1; i++) { @@ -692,14 +713,7 @@ public void Changed_event_no_longer_gets_blocked_when_invoking_delegates() syncProgressResolver.FindBestFullBlock().Returns(number); selector.Update(); } - - void BlockingMethod(object? sender, SyncModeChangedEventArgs e) - { - waitTokenSource.Token.WaitHandle.WaitOne(1000); - if (!waitTokenSource.IsCancellationRequested) - Assert.Fail(); - } - void TestMethod(object? sender, SyncModeChangedEventArgs e) + void SecondDelegate(object? sender, SyncModeChangedEventArgs e) { waitTokenSource.Cancel(); } diff --git a/src/Nethermind/Nethermind.Synchronization.Test/SyncReportTest.cs b/src/Nethermind/Nethermind.Synchronization.Test/SyncReportTest.cs index ee4c6b8c8cf..4fd91b754e0 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/SyncReportTest.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/SyncReportTest.cs @@ -4,7 +4,6 @@ using System; using System.Collections.Generic; using System.Globalization; -using System.Threading.Tasks; using Nethermind.Blockchain.Synchronization; using Nethermind.Core.Timers; using Nethermind.Logging; @@ -97,6 +96,9 @@ public void Ancient_bodies_and_receipts_are_reported_correctly(bool setBarriers) } SyncReport syncReport = new(pool, Substitute.For(), syncConfig, Substitute.For(), logManager, timerFactory); + syncReport.FastBlocksHeaders.Reset(0); + syncReport.FastBlocksBodies.Reset(0); + syncReport.FastBlocksReceipts.Reset(0); syncReport.SyncModeSelectorOnChanged(null, new SyncModeChangedEventArgs(SyncMode.None, SyncMode.FastHeaders | SyncMode.FastBodies | SyncMode.FastReceipts)); timer.Elapsed += Raise.Event(); @@ -113,5 +115,57 @@ public void Ancient_bodies_and_receipts_are_reported_correctly(bool setBarriers) iLogger.Received(1).Info("Old Receipts 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s"); } } + + [TestCase(false)] + [TestCase(true)] + public void Ancient_bodies_and_receipts_are_not_reported_until_feed_finishes_Initialization(bool setBarriers) + { + CultureInfo.CurrentCulture = CultureInfo.InvariantCulture; + ISyncPeerPool pool = Substitute.For(); + pool.InitializedPeersCount.Returns(1); + ITimerFactory timerFactory = Substitute.For(); + ITimer timer = Substitute.For(); + timerFactory.CreateTimer(Arg.Any()).Returns(timer); + ILogManager logManager = Substitute.For(); + InterfaceLogger iLogger = Substitute.For(); + iLogger.IsInfo.Returns(true); + iLogger.IsError.Returns(true); + ILogger logger = new(iLogger); + logManager.GetClassLogger().Returns(logger); + + Queue syncModes = new(); + syncModes.Enqueue(SyncMode.FastHeaders); + syncModes.Enqueue(SyncMode.FastBodies); + syncModes.Enqueue(SyncMode.FastReceipts); + + SyncConfig syncConfig = new() + { + FastBlocks = true, + FastSync = true, + PivotNumber = "100", + }; + if (setBarriers) + { + syncConfig.AncientBodiesBarrier = 30; + syncConfig.AncientReceiptsBarrier = 35; + } + + SyncReport syncReport = new(pool, Substitute.For(), syncConfig, Substitute.For(), logManager, timerFactory); + syncReport.SyncModeSelectorOnChanged(null, new SyncModeChangedEventArgs(SyncMode.None, SyncMode.FastHeaders | SyncMode.FastBodies | SyncMode.FastReceipts)); + timer.Elapsed += Raise.Event(); + + if (setBarriers) + { + iLogger.DidNotReceive().Info("Old Headers 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s"); + iLogger.DidNotReceive().Info("Old Bodies 0 / 70 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s"); + iLogger.DidNotReceive().Info("Old Receipts 0 / 65 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s"); + } + else + { + iLogger.DidNotReceive().Info("Old Headers 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s"); + iLogger.DidNotReceive().Info("Old Bodies 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s"); + iLogger.DidNotReceive().Info("Old Receipts 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s"); + } + } } } diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncFeed.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncFeed.cs index e9f992988c7..25dca89b8ed 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncFeed.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncFeed.cs @@ -8,12 +8,9 @@ using Nethermind.Blockchain.Synchronization; using Nethermind.Consensus.Validators; using Nethermind.Core; -using Nethermind.Core.Crypto; -using Nethermind.Core.Extensions; using Nethermind.Core.Specs; using Nethermind.Db; using Nethermind.Logging; -using Nethermind.State.Proofs; using Nethermind.Stats.Model; using Nethermind.Synchronization.ParallelSync; using Nethermind.Synchronization.Peers; @@ -86,6 +83,8 @@ public override void InitializeFeed() InitializeMetadataDb(); } base.InitializeFeed(); + _syncReport.FastBlocksBodies.Reset(0); + _syncReport.BodiesInQueue.Reset(0); } private void ResetSyncStatusList() @@ -275,7 +274,7 @@ private void LogPostProcessingBatchInfo(BodiesSyncBatch batch, int validResponse private void UpdateSyncReport() { - _syncReport.FastBlocksBodies.Update(_pivotNumber - _syncStatusList.LowestInsertWithoutGaps); + _syncReport.FastBlocksBodies.Update(_barrier - _syncStatusList.LowestInsertWithoutGaps); _syncReport.BodiesInQueue.Update(_syncStatusList.QueueSize); } diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/FastHeadersSyncFeed.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/FastHeadersSyncFeed.cs index fc5bce0dfdc..4b27df92bd6 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/FastHeadersSyncFeed.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/FastHeadersSyncFeed.cs @@ -202,6 +202,8 @@ public override void InitializeFeed() } base.InitializeFeed(); + HeadersSyncProgressReport.Reset(0); + HeadersSyncQueueReport.Reset(0); } protected virtual void ResetPivot() diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncFeed.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncFeed.cs index aab2343b1a9..55e956ee94d 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncFeed.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncFeed.cs @@ -86,6 +86,8 @@ public override void InitializeFeed() InitializeMetadataDb(); } base.InitializeFeed(); + _syncReport.FastBlocksReceipts.Reset(0); + _syncReport.ReceiptsInQueue.Reset(0); } private void ResetSyncStatusList() @@ -279,7 +281,7 @@ private int InsertReceipts(ReceiptsSyncBatch batch) AdjustRequestSize(batch, validResponsesCount); LogPostProcessingBatchInfo(batch, validResponsesCount); - _syncReport.FastBlocksReceipts.Update(_pivotNumber - _syncStatusList.LowestInsertWithoutGaps); + _syncReport.FastBlocksReceipts.Update(_barrier - _syncStatusList.LowestInsertWithoutGaps); _syncReport.ReceiptsInQueue.Update(_syncStatusList.QueueSize); return validResponsesCount; } diff --git a/src/Nethermind/Nethermind.Synchronization/FastSync/StateSyncFeed.cs b/src/Nethermind/Nethermind.Synchronization/FastSync/StateSyncFeed.cs index 875eb380899..e1529b31e45 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastSync/StateSyncFeed.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastSync/StateSyncFeed.cs @@ -76,8 +76,11 @@ public override void SyncModeSelectorOnChanged(SyncMode current) { if ((current & SyncMode.StateNodes) == SyncMode.StateNodes) { - _treeSync.ResetStateRootToBestSuggested(CurrentState); - Activate(); + Task.Run(() => + { + _treeSync.ResetStateRootToBestSuggested(CurrentState); + Activate(); + }); } } diff --git a/src/Nethermind/Nethermind.Synchronization/ParallelSync/ActivatedSyncFeed.cs b/src/Nethermind/Nethermind.Synchronization/ParallelSync/ActivatedSyncFeed.cs index 936a140f84e..6485e2c27c6 100644 --- a/src/Nethermind/Nethermind.Synchronization/ParallelSync/ActivatedSyncFeed.cs +++ b/src/Nethermind/Nethermind.Synchronization/ParallelSync/ActivatedSyncFeed.cs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: LGPL-3.0-only using System; +using System.Threading.Tasks; namespace Nethermind.Synchronization.ParallelSync { @@ -27,8 +28,11 @@ public override void SyncModeSelectorOnChanged(SyncMode current) if (_disposed) return; if (ShouldBeActive(current)) { - InitializeFeed(); - Activate(); + Task.Run(() => + { + InitializeFeed(); + Activate(); + }); } if (ShouldBeDormant(current)) diff --git a/src/Nethermind/Nethermind.Synchronization/ParallelSync/MultiSyncModeSelector.cs b/src/Nethermind/Nethermind.Synchronization/ParallelSync/MultiSyncModeSelector.cs index 6d14329a2c0..ab8ef0ef71c 100644 --- a/src/Nethermind/Nethermind.Synchronization/ParallelSync/MultiSyncModeSelector.cs +++ b/src/Nethermind/Nethermind.Synchronization/ParallelSync/MultiSyncModeSelector.cs @@ -278,20 +278,10 @@ private void UpdateSyncModes(SyncMode newModes, string? reason = null) // for example when switching to Full sync we need to ensure that we safely transition // DBS and processors if needed - ParallelInvoke(Preparing); - ParallelInvoke(Changing); + Preparing?.Invoke(this, args); + Changing?.Invoke(this, args); Current = newModes; - ParallelInvoke(Changed); - - void ParallelInvoke(EventHandler handler) - { - EventHandler handlerCopy = handler; - if (handlerCopy is not null) - { - Parallel.ForEach(handlerCopy.GetInvocationList(), - deleg => deleg.DynamicInvoke(this, args)); - } - } + Changed?.Invoke(this, args); } /// diff --git a/src/Nethermind/Nethermind.Synchronization/Reporting/SyncReport.cs b/src/Nethermind/Nethermind.Synchronization/Reporting/SyncReport.cs index 7c08123056a..d9b7c1cb010 100644 --- a/src/Nethermind/Nethermind.Synchronization/Reporting/SyncReport.cs +++ b/src/Nethermind/Nethermind.Synchronization/Reporting/SyncReport.cs @@ -293,19 +293,19 @@ private void WriteFullSyncReport() private void WriteFastBlocksReport(SyncMode currentSyncMode) { - if ((currentSyncMode & SyncMode.FastHeaders) == SyncMode.FastHeaders) + if ((currentSyncMode & SyncMode.FastHeaders) == SyncMode.FastHeaders && FastBlocksHeaders.HasStarted) { _logger.Info($"Old Headers {Pad(FastBlocksHeaders.CurrentValue, _blockPaddingLength)} / {_paddedPivot} ({FastBlocksHeaders.CurrentValue / (float)(_fastBlocksPivotNumber + 1),8:P2}) | queue {Pad(HeadersInQueue.CurrentValue, SpeedPaddingLength)} | current {Pad(FastBlocksHeaders.CurrentPerSecond, SpeedPaddingLength)} Blk/s | total {Pad(FastBlocksHeaders.TotalPerSecond, SpeedPaddingLength)} Blk/s"); FastBlocksHeaders.SetMeasuringPoint(); } - if ((currentSyncMode & SyncMode.FastBodies) == SyncMode.FastBodies) + if ((currentSyncMode & SyncMode.FastBodies) == SyncMode.FastBodies && FastBlocksBodies.HasStarted) { _logger.Info($"Old Bodies {Pad(FastBlocksBodies.CurrentValue, _blockPaddingLength)} / {_paddedAmountOfOldBodiesToDownload} ({FastBlocksBodies.CurrentValue / (float)(_amountOfBodiesToDownload + 1),8:P2}) | queue {Pad(BodiesInQueue.CurrentValue, SpeedPaddingLength)} | current {Pad(FastBlocksBodies.CurrentPerSecond, SpeedPaddingLength)} Blk/s | total {Pad(FastBlocksBodies.TotalPerSecond, SpeedPaddingLength)} Blk/s"); FastBlocksBodies.SetMeasuringPoint(); } - if ((currentSyncMode & SyncMode.FastReceipts) == SyncMode.FastReceipts) + if ((currentSyncMode & SyncMode.FastReceipts) == SyncMode.FastReceipts && FastBlocksReceipts.HasStarted) { _logger.Info($"Old Receipts {Pad(FastBlocksReceipts.CurrentValue, _blockPaddingLength)} / {_paddedAmountOfOldReceiptsToDownload} ({FastBlocksReceipts.CurrentValue / (float)(_amountOfReceiptsToDownload + 1),8:P2}) | queue {Pad(ReceiptsInQueue.CurrentValue, SpeedPaddingLength)} | current {Pad(FastBlocksReceipts.CurrentPerSecond, SpeedPaddingLength)} Blk/s | total {Pad(FastBlocksReceipts.TotalPerSecond, SpeedPaddingLength)} Blk/s"); FastBlocksReceipts.SetMeasuringPoint();