From efcd4718370dd47e6bd861408ffd332d36e7e4a5 Mon Sep 17 00:00:00 2001 From: muXxer Date: Tue, 20 Sep 2022 14:49:01 +0200 Subject: [PATCH] Fix edge case in warpsync which deadlocks syncing --- pkg/protocol/gossip/warpsync.go | 35 +++++++++++++++++---------------- plugins/warpsync/component.go | 10 +++++----- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/pkg/protocol/gossip/warpsync.go b/pkg/protocol/gossip/warpsync.go index 4762d278d..f6da0c5c0 100644 --- a/pkg/protocol/gossip/warpsync.go +++ b/pkg/protocol/gossip/warpsync.go @@ -255,6 +255,8 @@ func (ws *WarpSync) reset() { type WarpSyncMilestoneRequester struct { syncutils.Mutex + // used to cancel the warp sync requester. + ctx context.Context // used to access the node storage. storage *storage.Storage // used to determine the sync status of the node. @@ -269,12 +271,14 @@ type WarpSyncMilestoneRequester struct { // NewWarpSyncMilestoneRequester creates a new WarpSyncMilestoneRequester instance. func NewWarpSyncMilestoneRequester( + ctx context.Context, dbStorage *storage.Storage, syncManager *syncmanager.SyncManager, requester *Requester, preventDiscard bool) *WarpSyncMilestoneRequester { return &WarpSyncMilestoneRequester{ + ctx: ctx, storage: dbStorage, syncManager: syncManager, requester: requester, @@ -283,13 +287,10 @@ func NewWarpSyncMilestoneRequester( } } -// RequestMissingMilestoneParents traverses the parents of a given milestone and requests each missing parent. +// requestMissingMilestoneParents traverses the parents of a given milestone and requests each missing parent. // Already requested milestones or traversed blocks will be ignored, to circumvent requesting // the same parents multiple times. -func (w *WarpSyncMilestoneRequester) RequestMissingMilestoneParents(ctx context.Context, msIndex iotago.MilestoneIndex) error { - w.Lock() - defer w.Unlock() - +func (w *WarpSyncMilestoneRequester) requestMissingMilestoneParents(msIndex iotago.MilestoneIndex) error { if msIndex <= w.syncManager.ConfirmedMilestoneIndex() { return nil } @@ -301,7 +302,7 @@ func (w *WarpSyncMilestoneRequester) RequestMissingMilestoneParents(ctx context. } return dag.TraverseParents( - ctx, + w.ctx, w.storage, milestoneParents, // traversal stops if no more blocks pass the given condition @@ -345,7 +346,10 @@ func (w *WarpSyncMilestoneRequester) Cleanup() { // RequestMilestoneRange requests up to N milestones nearest to the current confirmed milestone index. // Returns the number of milestones requested. -func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(ctx context.Context, rangeToRequest syncmanager.MilestoneIndexDelta, onExistingMilestoneInRange func(ctx context.Context, msIndex iotago.MilestoneIndex) error, from ...iotago.MilestoneIndex) syncmanager.MilestoneIndexDelta { +func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(rangeToRequest syncmanager.MilestoneIndexDelta, from ...iotago.MilestoneIndex) (syncmanager.MilestoneIndexDelta, iotago.MilestoneIndex, iotago.MilestoneIndex) { + w.Lock() + defer w.Unlock() + var requested syncmanager.MilestoneIndexDelta startingPoint := w.syncManager.ConfirmedMilestoneIndex() @@ -353,6 +357,9 @@ func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(ctx context.Context, startingPoint = from[0] } + startIndex := startingPoint + 1 + endIndex := startingPoint + rangeToRequest + var msIndexes []iotago.MilestoneIndex for i := syncmanager.MilestoneIndexDelta(1); i <= rangeToRequest; i++ { msIndexToRequest := startingPoint + i @@ -366,22 +373,16 @@ func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(ctx context.Context, } // milestone already exists - if onExistingMilestoneInRange != nil { - if err := onExistingMilestoneInRange(ctx, msIndexToRequest); err != nil && errors.Is(err, common.ErrOperationAborted) { - // do not proceed if the node was shut down - return 0 - } + if err := w.requestMissingMilestoneParents(msIndexToRequest); err != nil && errors.Is(err, common.ErrOperationAborted) { + // do not proceed if the node was shut down + return 0, 0, 0 } } - if len(msIndexes) == 0 { - return requested - } - // enqueue every milestone request to the request queue for _, msIndex := range msIndexes { w.requester.Request(msIndex, msIndex) } - return requested + return requested, startIndex, endIndex } diff --git a/plugins/warpsync/component.go b/plugins/warpsync/component.go index f672e581f..3230fe41c 100644 --- a/plugins/warpsync/component.go +++ b/plugins/warpsync/component.go @@ -60,7 +60,7 @@ type dependencies struct { func configure() error { warpSync = gossip.NewWarpSync(ParamsWarpSync.AdvancementRange) - warpSyncMilestoneRequester = gossip.NewWarpSyncMilestoneRequester(deps.Storage, deps.SyncManager, deps.Requester, true) + warpSyncMilestoneRequester = gossip.NewWarpSyncMilestoneRequester(Plugin.Daemon().ContextStopped(), deps.Storage, deps.SyncManager, deps.Requester, true) configureEvents() return nil @@ -101,8 +101,8 @@ func configureEvents() { onMilestoneSolidificationFailed = events.NewClosure(func(msIndex iotago.MilestoneIndex) { if warpSync.CurrentCheckpoint != 0 && warpSync.CurrentCheckpoint < msIndex { // rerequest since milestone requests could have been lost - Plugin.LogInfof("Requesting missing milestones %d - %d", msIndex, msIndex+warpSync.AdvancementRange) - warpSyncMilestoneRequester.RequestMilestoneRange(Plugin.Daemon().ContextStopped(), warpSync.AdvancementRange, nil) + _, msIndexStart, msIndexEnd := warpSyncMilestoneRequester.RequestMilestoneRange(warpSync.AdvancementRange) + Plugin.LogInfof("Requesting missing milestones %d - %d", msIndexStart, msIndexEnd) } }) @@ -112,7 +112,7 @@ func configureEvents() { deps.RequestQueue.Filter(func(r *gossip.Request) bool { return r.MilestoneIndex <= nextCheckpoint }) - warpSyncMilestoneRequester.RequestMilestoneRange(Plugin.Daemon().ContextStopped(), advRange, warpSyncMilestoneRequester.RequestMissingMilestoneParents, oldCheckpoint) + _, _, _ = warpSyncMilestoneRequester.RequestMilestoneRange(advRange, oldCheckpoint) }) onWarpSyncTargetUpdated = events.NewClosure(func(checkpoint iotago.MilestoneIndex, newTarget iotago.MilestoneIndex) { @@ -125,7 +125,7 @@ func configureEvents() { return r.MilestoneIndex <= nextCheckpoint }) - msRequested := warpSyncMilestoneRequester.RequestMilestoneRange(Plugin.Daemon().ContextStopped(), advRange, warpSyncMilestoneRequester.RequestMissingMilestoneParents) + msRequested, _, _ := warpSyncMilestoneRequester.RequestMilestoneRange(advRange) // if the amount of requested milestones doesn't correspond to the range, // it means we already had the milestones in the database, which suggests // that we should manually kick start the milestone solidifier.