Skip to content

Commit

Permalink
Different approach with extra fix for log messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
smartprogrammer93 committed Mar 6, 2024
1 parent cbd32c3 commit ec355fd
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 39 deletions.
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Core/MeasuredProgress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public void SetMeasuringPoint()
}
}

public bool HasStarted => UtcStartTime.HasValue;

public bool HasEnded => UtcEndTime.HasValue;

public void MarkEnd()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
using System.Linq;
using System.Threading;
using FluentAssertions;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Receipts;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core;
using Nethermind.Db;
using Nethermind.Int256;
using Nethermind.Logging;
using Nethermind.Specs;
using Nethermind.Synchronization.FastBlocks;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.Synchronization.Peers;
using Nethermind.Synchronization.Reporting;
using NSubstitute;
using NUnit.Framework;

Expand Down Expand Up @@ -669,8 +675,12 @@ public void Changed_event_no_longer_gets_blocked_when_invoking_delegates()
syncPeerPool.InitializedPeers.Returns(peerInfos);
syncPeerPool.AllPeers.Returns(peerInfos);

ISyncConfig syncConfig = new SyncConfig() { FastSyncCatchUpHeightDelta = 2 };
syncConfig.FastSync = true;
ISyncConfig syncConfig = new SyncConfig
{
FastSyncCatchUpHeightDelta = 2,
FastSync = true,
FastBlocks = true
};

TotalDifficultyBetterPeerStrategy bestPeerStrategy = new(LimboLogs.Instance);
MultiSyncModeSelector selector = new(syncProgressResolver, syncPeerPool, syncConfig, No.BeaconSync, bestPeerStrategy, LimboLogs.Instance);
Expand All @@ -679,9 +689,20 @@ public void Changed_event_no_longer_gets_blocked_when_invoking_delegates()
selector.Update();
selector.Current.Should().Be(SyncMode.Full);

CancellationTokenSource waitTokenSource = new CancellationTokenSource();
selector.Changed += BlockingMethod;
selector.Changed += TestMethod;
CancellationTokenSource waitTokenSource = new();
ReceiptsSyncFeed receiptsSyncFeed = Substitute.ForPartsOf<ReceiptsSyncFeed>(MainnetSpecProvider.Instance, Substitute.For<IBlockTree>(), Substitute.For<IReceiptStorage>(), syncPeerPool, syncConfig, Substitute.For<ISyncReport>(), Substitute.For<IDb>(), LimboLogs.Instance);
receiptsSyncFeed.When(rsf => rsf.InitializeFeed()).DoNotCallBase();
receiptsSyncFeed.When(rsf => rsf.InitializeFeed()).Do(e =>
{
waitTokenSource.Token.WaitHandle.WaitOne(1000);
if (!waitTokenSource.IsCancellationRequested)
Assert.Fail();
});
selector.Changed += (sender, args) =>
{
receiptsSyncFeed?.SyncModeSelectorOnChanged(SyncMode.FastReceipts);
};
selector.Changed += SecondDelegate;

for (uint i = 0; i < syncConfig.FastSyncCatchUpHeightDelta + 1; i++)
{
Expand All @@ -692,14 +713,7 @@ public void Changed_event_no_longer_gets_blocked_when_invoking_delegates()
syncProgressResolver.FindBestFullBlock().Returns(number);
selector.Update();
}

void BlockingMethod(object? sender, SyncModeChangedEventArgs e)
{
waitTokenSource.Token.WaitHandle.WaitOne(1000);
if (!waitTokenSource.IsCancellationRequested)
Assert.Fail();
}
void TestMethod(object? sender, SyncModeChangedEventArgs e)
void SecondDelegate(object? sender, SyncModeChangedEventArgs e)
{
waitTokenSource.Cancel();
}
Expand Down
56 changes: 55 additions & 1 deletion src/Nethermind/Nethermind.Synchronization.Test/SyncReportTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Threading.Tasks;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core.Timers;
using Nethermind.Logging;
Expand Down Expand Up @@ -97,6 +96,9 @@ public void Ancient_bodies_and_receipts_are_reported_correctly(bool setBarriers)
}

SyncReport syncReport = new(pool, Substitute.For<INodeStatsManager>(), syncConfig, Substitute.For<IPivot>(), logManager, timerFactory);
syncReport.FastBlocksHeaders.Reset(0);
syncReport.FastBlocksBodies.Reset(0);
syncReport.FastBlocksReceipts.Reset(0);
syncReport.SyncModeSelectorOnChanged(null, new SyncModeChangedEventArgs(SyncMode.None, SyncMode.FastHeaders | SyncMode.FastBodies | SyncMode.FastReceipts));
timer.Elapsed += Raise.Event();

Expand All @@ -113,5 +115,57 @@ public void Ancient_bodies_and_receipts_are_reported_correctly(bool setBarriers)
iLogger.Received(1).Info("Old Receipts 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
}
}

[TestCase(false)]
[TestCase(true)]
public void Ancient_bodies_and_receipts_are_not_reported_until_feed_finishes_Initialization(bool setBarriers)
{
CultureInfo.CurrentCulture = CultureInfo.InvariantCulture;
ISyncPeerPool pool = Substitute.For<ISyncPeerPool>();
pool.InitializedPeersCount.Returns(1);
ITimerFactory timerFactory = Substitute.For<ITimerFactory>();
ITimer timer = Substitute.For<ITimer>();
timerFactory.CreateTimer(Arg.Any<TimeSpan>()).Returns(timer);
ILogManager logManager = Substitute.For<ILogManager>();
InterfaceLogger iLogger = Substitute.For<InterfaceLogger>();
iLogger.IsInfo.Returns(true);
iLogger.IsError.Returns(true);
ILogger logger = new(iLogger);
logManager.GetClassLogger().Returns(logger);

Queue<SyncMode> syncModes = new();
syncModes.Enqueue(SyncMode.FastHeaders);
syncModes.Enqueue(SyncMode.FastBodies);
syncModes.Enqueue(SyncMode.FastReceipts);

SyncConfig syncConfig = new()
{
FastBlocks = true,
FastSync = true,
PivotNumber = "100",
};
if (setBarriers)
{
syncConfig.AncientBodiesBarrier = 30;
syncConfig.AncientReceiptsBarrier = 35;
}

SyncReport syncReport = new(pool, Substitute.For<INodeStatsManager>(), syncConfig, Substitute.For<IPivot>(), logManager, timerFactory);
syncReport.SyncModeSelectorOnChanged(null, new SyncModeChangedEventArgs(SyncMode.None, SyncMode.FastHeaders | SyncMode.FastBodies | SyncMode.FastReceipts));
timer.Elapsed += Raise.Event();

if (setBarriers)
{
iLogger.DidNotReceive().Info("Old Headers 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
iLogger.DidNotReceive().Info("Old Bodies 0 / 70 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
iLogger.DidNotReceive().Info("Old Receipts 0 / 65 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
}
else
{
iLogger.DidNotReceive().Info("Old Headers 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
iLogger.DidNotReceive().Info("Old Bodies 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
iLogger.DidNotReceive().Info("Old Receipts 0 / 100 ( 0.00 %) | queue 0 | current 0 Blk/s | total 0 Blk/s");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@
using Nethermind.Blockchain.Synchronization;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Core.Specs;
using Nethermind.Db;
using Nethermind.Logging;
using Nethermind.State.Proofs;
using Nethermind.Stats.Model;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.Synchronization.Peers;
Expand Down Expand Up @@ -86,6 +83,8 @@ public override void InitializeFeed()
InitializeMetadataDb();
}
base.InitializeFeed();
_syncReport.FastBlocksBodies.Reset(0);
_syncReport.BodiesInQueue.Reset(0);
}

private void ResetSyncStatusList()
Expand Down Expand Up @@ -275,7 +274,7 @@ private void LogPostProcessingBatchInfo(BodiesSyncBatch batch, int validResponse

private void UpdateSyncReport()
{
_syncReport.FastBlocksBodies.Update(_pivotNumber - _syncStatusList.LowestInsertWithoutGaps);
_syncReport.FastBlocksBodies.Update(_barrier - _syncStatusList.LowestInsertWithoutGaps);
_syncReport.BodiesInQueue.Update(_syncStatusList.QueueSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ public override void InitializeFeed()
}

base.InitializeFeed();
HeadersSyncProgressReport.Reset(0);
HeadersSyncQueueReport.Reset(0);
}

protected virtual void ResetPivot()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public override void InitializeFeed()
InitializeMetadataDb();
}
base.InitializeFeed();
_syncReport.FastBlocksReceipts.Reset(0);
_syncReport.ReceiptsInQueue.Reset(0);
}

private void ResetSyncStatusList()
Expand Down Expand Up @@ -279,7 +281,7 @@ private int InsertReceipts(ReceiptsSyncBatch batch)
AdjustRequestSize(batch, validResponsesCount);
LogPostProcessingBatchInfo(batch, validResponsesCount);

_syncReport.FastBlocksReceipts.Update(_pivotNumber - _syncStatusList.LowestInsertWithoutGaps);
_syncReport.FastBlocksReceipts.Update(_barrier - _syncStatusList.LowestInsertWithoutGaps);
_syncReport.ReceiptsInQueue.Update(_syncStatusList.QueueSize);
return validResponsesCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,11 @@ public override void SyncModeSelectorOnChanged(SyncMode current)
{
if ((current & SyncMode.StateNodes) == SyncMode.StateNodes)
{
_treeSync.ResetStateRootToBestSuggested(CurrentState);
Activate();
Task.Run(() =>
{
_treeSync.ResetStateRootToBestSuggested(CurrentState);
Activate();
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading.Tasks;

namespace Nethermind.Synchronization.ParallelSync
{
Expand All @@ -27,8 +28,11 @@ public override void SyncModeSelectorOnChanged(SyncMode current)
if (_disposed) return;
if (ShouldBeActive(current))
{
InitializeFeed();
Activate();
Task.Run(() =>
{
InitializeFeed();
Activate();
});
}

if (ShouldBeDormant(current))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,20 +278,10 @@ private void UpdateSyncModes(SyncMode newModes, string? reason = null)
// for example when switching to Full sync we need to ensure that we safely transition
// DBS and processors if needed

ParallelInvoke(Preparing);
ParallelInvoke(Changing);
Preparing?.Invoke(this, args);
Changing?.Invoke(this, args);
Current = newModes;
ParallelInvoke(Changed);

void ParallelInvoke(EventHandler<SyncModeChangedEventArgs> handler)
{
EventHandler<SyncModeChangedEventArgs> handlerCopy = handler;
if (handlerCopy is not null)
{
Parallel.ForEach(handlerCopy.GetInvocationList(),
deleg => deleg.DynamicInvoke(this, args));
}
}
Changed?.Invoke(this, args);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,19 +293,19 @@ private void WriteFullSyncReport()

private void WriteFastBlocksReport(SyncMode currentSyncMode)
{
if ((currentSyncMode & SyncMode.FastHeaders) == SyncMode.FastHeaders)
if ((currentSyncMode & SyncMode.FastHeaders) == SyncMode.FastHeaders && FastBlocksHeaders.HasStarted)
{
_logger.Info($"Old Headers {Pad(FastBlocksHeaders.CurrentValue, _blockPaddingLength)} / {_paddedPivot} ({FastBlocksHeaders.CurrentValue / (float)(_fastBlocksPivotNumber + 1),8:P2}) | queue {Pad(HeadersInQueue.CurrentValue, SpeedPaddingLength)} | current {Pad(FastBlocksHeaders.CurrentPerSecond, SpeedPaddingLength)} Blk/s | total {Pad(FastBlocksHeaders.TotalPerSecond, SpeedPaddingLength)} Blk/s");
FastBlocksHeaders.SetMeasuringPoint();
}

if ((currentSyncMode & SyncMode.FastBodies) == SyncMode.FastBodies)
if ((currentSyncMode & SyncMode.FastBodies) == SyncMode.FastBodies && FastBlocksBodies.HasStarted)
{
_logger.Info($"Old Bodies {Pad(FastBlocksBodies.CurrentValue, _blockPaddingLength)} / {_paddedAmountOfOldBodiesToDownload} ({FastBlocksBodies.CurrentValue / (float)(_amountOfBodiesToDownload + 1),8:P2}) | queue {Pad(BodiesInQueue.CurrentValue, SpeedPaddingLength)} | current {Pad(FastBlocksBodies.CurrentPerSecond, SpeedPaddingLength)} Blk/s | total {Pad(FastBlocksBodies.TotalPerSecond, SpeedPaddingLength)} Blk/s");
FastBlocksBodies.SetMeasuringPoint();
}

if ((currentSyncMode & SyncMode.FastReceipts) == SyncMode.FastReceipts)
if ((currentSyncMode & SyncMode.FastReceipts) == SyncMode.FastReceipts && FastBlocksReceipts.HasStarted)
{
_logger.Info($"Old Receipts {Pad(FastBlocksReceipts.CurrentValue, _blockPaddingLength)} / {_paddedAmountOfOldReceiptsToDownload} ({FastBlocksReceipts.CurrentValue / (float)(_amountOfReceiptsToDownload + 1),8:P2}) | queue {Pad(ReceiptsInQueue.CurrentValue, SpeedPaddingLength)} | current {Pad(FastBlocksReceipts.CurrentPerSecond, SpeedPaddingLength)} Blk/s | total {Pad(FastBlocksReceipts.TotalPerSecond, SpeedPaddingLength)} Blk/s");
FastBlocksReceipts.SetMeasuringPoint();
Expand Down

0 comments on commit ec355fd

Please sign in to comment.