Skip to content

Commit

Permalink
Merge pull request #2761 from k8s-infra-cherrypick-robot/cherry-pick-…
Browse files Browse the repository at this point in the history
…2757-to-release-0.17

[release-0.17] 🐛 Runnable group should check if stopped before enqueueing
  • Loading branch information
k8s-ci-robot authored Apr 5, 2024
2 parents 854a6b1 + f5833f3 commit c25fe2f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
15 changes: 14 additions & 1 deletion pkg/manager/runnable_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
r.start.Unlock()
}

// Recheck if we're stopped and hold the readlock, given that the stop and start can be called
// at the same time, we can end up in a situation where the runnable is added
// after the group is stopped and the channel is closed.
r.stop.RLock()
defer r.stop.RUnlock()
if r.stopped {
return errRunnableGroupStopped
}

// Enqueue the runnable.
r.ch <- readyRunnable
return nil
Expand All @@ -272,7 +281,11 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
func (r *runnableGroup) StopAndWait(ctx context.Context) {
r.stopOnce.Do(func() {
// Close the reconciler channel once we're done.
defer close(r.ch)
defer func() {
r.stop.Lock()
close(r.ch)
r.stop.Unlock()
}()

_ = r.Start(ctx)
r.stop.Lock()
Expand Down
36 changes: 36 additions & 0 deletions pkg/manager/runnable_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,42 @@ var _ = Describe("runnableGroup", func() {
}
})

It("should be able to handle adding runnables while stopping", func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
rg := newRunnableGroup(defaultBaseContext, errCh)

go func() {
defer GinkgoRecover()
<-time.After(1 * time.Millisecond)
Expect(rg.Start(ctx)).To(Succeed())
}()
go func() {
defer GinkgoRecover()
<-time.After(1 * time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
cancel()
rg.StopAndWait(ctx)
}()

for i := 0; i < 200; i++ {
go func(i int) {
defer GinkgoRecover()

<-time.After(time.Duration(i) * time.Microsecond)
Expect(rg.Add(RunnableFunc(func(c context.Context) error {
<-ctx.Done()
return nil
}), func(_ context.Context) bool {
return true
})).To(SatisfyAny(
Succeed(),
Equal(errRunnableGroupStopped),
))
}(i)
}
})

It("should not turn ready if some readiness check fail", func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down

0 comments on commit c25fe2f

Please sign in to comment.