Skip to content

Commit

Permalink
Fix edge case in warpsync which deadlocks syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
muXxer committed Sep 20, 2022
1 parent 40baafb commit efcd471
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 22 deletions.
35 changes: 18 additions & 17 deletions pkg/protocol/gossip/warpsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ func (ws *WarpSync) reset() {
type WarpSyncMilestoneRequester struct {
syncutils.Mutex

// used to cancel the warp sync requester.
ctx context.Context
// used to access the node storage.
storage *storage.Storage
// used to determine the sync status of the node.
Expand All @@ -269,12 +271,14 @@ type WarpSyncMilestoneRequester struct {

// NewWarpSyncMilestoneRequester creates a new WarpSyncMilestoneRequester instance.
func NewWarpSyncMilestoneRequester(
ctx context.Context,
dbStorage *storage.Storage,
syncManager *syncmanager.SyncManager,
requester *Requester,
preventDiscard bool) *WarpSyncMilestoneRequester {

return &WarpSyncMilestoneRequester{
ctx: ctx,
storage: dbStorage,
syncManager: syncManager,
requester: requester,
Expand All @@ -283,13 +287,10 @@ func NewWarpSyncMilestoneRequester(
}
}

// RequestMissingMilestoneParents traverses the parents of a given milestone and requests each missing parent.
// requestMissingMilestoneParents traverses the parents of a given milestone and requests each missing parent.
// Already requested milestones or traversed blocks will be ignored, to circumvent requesting
// the same parents multiple times.
func (w *WarpSyncMilestoneRequester) RequestMissingMilestoneParents(ctx context.Context, msIndex iotago.MilestoneIndex) error {
w.Lock()
defer w.Unlock()

func (w *WarpSyncMilestoneRequester) requestMissingMilestoneParents(msIndex iotago.MilestoneIndex) error {
if msIndex <= w.syncManager.ConfirmedMilestoneIndex() {
return nil
}
Expand All @@ -301,7 +302,7 @@ func (w *WarpSyncMilestoneRequester) RequestMissingMilestoneParents(ctx context.
}

return dag.TraverseParents(
ctx,
w.ctx,
w.storage,
milestoneParents,
// traversal stops if no more blocks pass the given condition
Expand Down Expand Up @@ -345,14 +346,20 @@ func (w *WarpSyncMilestoneRequester) Cleanup() {

// RequestMilestoneRange requests up to N milestones nearest to the current confirmed milestone index.
// Returns the number of milestones requested.
func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(ctx context.Context, rangeToRequest syncmanager.MilestoneIndexDelta, onExistingMilestoneInRange func(ctx context.Context, msIndex iotago.MilestoneIndex) error, from ...iotago.MilestoneIndex) syncmanager.MilestoneIndexDelta {
func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(rangeToRequest syncmanager.MilestoneIndexDelta, from ...iotago.MilestoneIndex) (syncmanager.MilestoneIndexDelta, iotago.MilestoneIndex, iotago.MilestoneIndex) {
w.Lock()
defer w.Unlock()

var requested syncmanager.MilestoneIndexDelta

startingPoint := w.syncManager.ConfirmedMilestoneIndex()
if len(from) > 0 {
startingPoint = from[0]
}

startIndex := startingPoint + 1
endIndex := startingPoint + rangeToRequest

var msIndexes []iotago.MilestoneIndex
for i := syncmanager.MilestoneIndexDelta(1); i <= rangeToRequest; i++ {
msIndexToRequest := startingPoint + i
Expand All @@ -366,22 +373,16 @@ func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(ctx context.Context,
}

// milestone already exists
if onExistingMilestoneInRange != nil {
if err := onExistingMilestoneInRange(ctx, msIndexToRequest); err != nil && errors.Is(err, common.ErrOperationAborted) {
// do not proceed if the node was shut down
return 0
}
if err := w.requestMissingMilestoneParents(msIndexToRequest); err != nil && errors.Is(err, common.ErrOperationAborted) {
// do not proceed if the node was shut down
return 0, 0, 0
}
}

if len(msIndexes) == 0 {
return requested
}

// enqueue every milestone request to the request queue
for _, msIndex := range msIndexes {
w.requester.Request(msIndex, msIndex)
}

return requested
return requested, startIndex, endIndex
}
10 changes: 5 additions & 5 deletions plugins/warpsync/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type dependencies struct {

func configure() error {
warpSync = gossip.NewWarpSync(ParamsWarpSync.AdvancementRange)
warpSyncMilestoneRequester = gossip.NewWarpSyncMilestoneRequester(deps.Storage, deps.SyncManager, deps.Requester, true)
warpSyncMilestoneRequester = gossip.NewWarpSyncMilestoneRequester(Plugin.Daemon().ContextStopped(), deps.Storage, deps.SyncManager, deps.Requester, true)
configureEvents()

return nil
Expand Down Expand Up @@ -101,8 +101,8 @@ func configureEvents() {
onMilestoneSolidificationFailed = events.NewClosure(func(msIndex iotago.MilestoneIndex) {
if warpSync.CurrentCheckpoint != 0 && warpSync.CurrentCheckpoint < msIndex {
// rerequest since milestone requests could have been lost
Plugin.LogInfof("Requesting missing milestones %d - %d", msIndex, msIndex+warpSync.AdvancementRange)
warpSyncMilestoneRequester.RequestMilestoneRange(Plugin.Daemon().ContextStopped(), warpSync.AdvancementRange, nil)
_, msIndexStart, msIndexEnd := warpSyncMilestoneRequester.RequestMilestoneRange(warpSync.AdvancementRange)
Plugin.LogInfof("Requesting missing milestones %d - %d", msIndexStart, msIndexEnd)
}
})

Expand All @@ -112,7 +112,7 @@ func configureEvents() {
deps.RequestQueue.Filter(func(r *gossip.Request) bool {
return r.MilestoneIndex <= nextCheckpoint
})
warpSyncMilestoneRequester.RequestMilestoneRange(Plugin.Daemon().ContextStopped(), advRange, warpSyncMilestoneRequester.RequestMissingMilestoneParents, oldCheckpoint)
_, _, _ = warpSyncMilestoneRequester.RequestMilestoneRange(advRange, oldCheckpoint)
})

onWarpSyncTargetUpdated = events.NewClosure(func(checkpoint iotago.MilestoneIndex, newTarget iotago.MilestoneIndex) {
Expand All @@ -125,7 +125,7 @@ func configureEvents() {
return r.MilestoneIndex <= nextCheckpoint
})

msRequested := warpSyncMilestoneRequester.RequestMilestoneRange(Plugin.Daemon().ContextStopped(), advRange, warpSyncMilestoneRequester.RequestMissingMilestoneParents)
msRequested, _, _ := warpSyncMilestoneRequester.RequestMilestoneRange(advRange)
// if the amount of requested milestones doesn't correspond to the range,
// it means we already had the milestones in the database, which suggests
// that we should manually kick start the milestone solidifier.
Expand Down

0 comments on commit efcd471

Please sign in to comment.