Skip to content

refactor concurrent work limiting in sync in x/sync #1347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 12 additions & 26 deletions x/sync/syncmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ const (
)

var (
token = struct{}{}
ErrAlreadyStarted = errors.New("cannot start a StateSyncManager that has already been started")
ErrAlreadyClosed = errors.New("StateSyncManager is closed")
ErrNoClientProvided = errors.New("client is a required field of the sync config")
Expand Down Expand Up @@ -80,7 +79,7 @@ type StateSyncManager struct {
unprocessedWork *syncWorkHeap
// Signalled when:
// - An item is added to [unprocessedWork].
// - There are no more items in [unprocessedWork] and [processingWorkItems] is 0.
// - An item is added to [processedWork].
// - Close() is called.
// [workLock] is its inner lock.
unprocessedWorkCond sync.Cond
Expand All @@ -93,9 +92,6 @@ type StateSyncManager struct {
// - [workToBeDone] and [completedWork] are closed.
syncDoneChan chan struct{}

// Rate-limits the number of concurrently processing work items.
workTokens chan struct{}

errLock sync.Mutex
// If non-nil, there was a fatal error.
// [errLock] must be held when accessing [fatalError].
Expand Down Expand Up @@ -134,14 +130,9 @@ func NewStateSyncManager(config StateSyncConfig) (*StateSyncManager, error) {
syncDoneChan: make(chan struct{}),
unprocessedWork: newSyncWorkHeap(2 * config.SimultaneousWorkLimit),
processedWork: newSyncWorkHeap(2 * config.SimultaneousWorkLimit),
workTokens: make(chan struct{}, config.SimultaneousWorkLimit),
}
m.unprocessedWorkCond.L = &m.workLock

// fill the work tokens channel with work tokens
for i := 0; i < config.SimultaneousWorkLimit; i++ {
m.workTokens <- token
}
return m, nil
}

Expand Down Expand Up @@ -170,8 +161,8 @@ func (m *StateSyncManager) StartSyncing(ctx context.Context) error {
func (m *StateSyncManager) sync(ctx context.Context) {
defer func() {
// Note we release [m.workLock] before calling Close()
// because Close() will try to acquire [m.workLock].
// Invariant: [m.workLock] is held when we return from this goroutine.
// because Close() will acquire [m.workLock].
// Invariant: [m.workLock] is held when this goroutine begins.
m.workLock.Unlock()
m.Close()
}()
Expand All @@ -183,6 +174,12 @@ func (m *StateSyncManager) sync(ctx context.Context) {
if ctx.Err() != nil { // [m] is closed.
return // [m.workLock] released by defer.
}
if m.processingWorkItems >= m.config.SimultaneousWorkLimit {
// We're already processing the maximum number of work items.
// Wait until one of them finishes.
m.unprocessedWorkCond.Wait()
continue
}
if m.unprocessedWork.Len() == 0 {
if m.processingWorkItems == 0 {
// There's no work to do, and there are no work items being processed
Expand Down Expand Up @@ -231,23 +228,12 @@ func (m *StateSyncManager) Close() {
// Processes [item] by fetching and applying a change or range proof.
// Assumes [m.workLock] is not held.
func (m *StateSyncManager) doWork(ctx context.Context, item *syncWorkItem) {
// Wait until we get a work token or we close.
select {
case <-m.workTokens:
case <-ctx.Done():
// [m] is closed and sync() is returning so don't care about cleanup.
return
}

defer func() {
m.workTokens <- token
m.workLock.Lock()
defer m.workLock.Unlock()

m.processingWorkItems--
if m.processingWorkItems == 0 && m.unprocessedWork.Len() == 0 {
// There are no processing or unprocessed work items so we're done.
m.unprocessedWorkCond.Signal()
}
m.workLock.Unlock()
m.unprocessedWorkCond.Signal()
}()

if item.LocalRootID == ids.Empty {
Expand Down