Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions x/sync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ func (m *Manager) getAndApplyChangeProof(ctx context.Context, work *workItem) {
// TODO danlaine send range proof instead of failure notification
if !changeProof.HadRootsInHistory {
work.localRootID = ids.Empty
m.workLock.Lock()
m.enqueueWork(work)
m.workLock.Unlock()
return
}

Expand Down Expand Up @@ -599,35 +601,44 @@ func (m *Manager) completeWorkItem(ctx context.Context, work *workItem, largestH
largestHandledKey = work.end
} else {
// the full range wasn't completed, so enqueue a new work item for the range [nextStartKey, workItem.end]
m.workLock.Lock()
m.enqueueWork(newWorkItem(work.localRootID, nextStartKey, work.end, work.priority))
m.workLock.Unlock()
largestHandledKey = nextStartKey
}
}

// completed the range [work.start, lastKey], log and record in the completed work heap
m.config.Log.Info("completed range",
zap.Binary("start", work.start),
zap.Binary("end", largestHandledKey),
)
if m.getTargetRoot() == rootID {
m.workLock.Lock()
defer m.workLock.Unlock()

// update work queues (make sure to keep same locking order as [UpdateSyncTarget]
// or could cause a deadlock)
m.workLock.Lock()
m.syncTargetLock.RLock()
var stale bool
if m.config.TargetRoot == rootID {
m.processedWork.MergeInsert(newWorkItem(rootID, work.start, largestHandledKey, work.priority))
} else {
// the root has changed, so reinsert with high priority
m.enqueueWork(newWorkItem(rootID, work.start, largestHandledKey, highPriority))
stale = true
}
m.syncTargetLock.RUnlock()
m.workLock.Unlock()

// completed the range [work.start, lastKey], log and record in the completed work heap
m.config.Log.Info("completed range",
zap.Binary("start", work.start),
zap.Binary("end", largestHandledKey),
zap.Stringer("rootID", rootID),
zap.Bool("stale", stale),
)
}

// Queue the given key range to be fetched and applied.
// If there are sufficiently few unprocessed/processing work items,
// splits the range into two items and queues them both.
// Assumes [m.workLock] is not held.
//
// Assumes [m.workLock] is held.
func (m *Manager) enqueueWork(work *workItem) {
m.workLock.Lock()
defer func() {
m.workLock.Unlock()
m.unprocessedWorkCond.Signal()
}()

Expand Down