Skip to content

Commit 5df573f

Browse files
committed
Rename to startEventSourcesLocked and lock with c.mu
1 parent c1d8ea4 commit 5df573f

File tree

2 files changed

+51
-23
lines changed

2 files changed

+51
-23
lines changed

pkg/internal/controller/controller.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,10 @@ func (c *Controller[request]) Warmup(ctx context.Context) error {
225225
return nil
226226
}
227227

228-
return c.startEventSources(ctx)
228+
c.mu.Lock()
229+
defer c.mu.Unlock()
230+
231+
return c.startEventSourcesLocked(ctx)
229232
}
230233

231234
// Start implements controller.Controller.
@@ -263,7 +266,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
263266
// NB(directxman12): launch the sources *before* trying to wait for the
264267
// caches to sync so that they have a chance to register their intended
265268
// caches.
266-
if err := c.startEventSources(ctx); err != nil {
269+
if err := c.startEventSourcesLocked(ctx); err != nil {
267270
return err
268271
}
269272

@@ -296,9 +299,9 @@ func (c *Controller[request]) Start(ctx context.Context) error {
296299
return nil
297300
}
298301

299-
// startEventSources launches all the sources registered with this controller and waits
302+
// startEventSourcesLocked launches all the sources registered with this controller and waits
300303
// for them to sync. It returns an error if any of the sources fail to start or sync.
301-
func (c *Controller[request]) startEventSources(ctx context.Context) error {
304+
func (c *Controller[request]) startEventSourcesLocked(ctx context.Context) error {
302305
var retErr error
303306

304307
c.didStartEventSourcesOnce.Do(func() {

pkg/internal/controller/controller_test.go

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -386,13 +386,13 @@ var _ = Describe("controller", func() {
386386
})
387387
})
388388

389-
Describe("startEventSources", func() {
389+
Describe("startEventSourcesLocked", func() {
390390
It("should return nil when no sources are provided", func() {
391391
ctx, cancel := context.WithCancel(context.Background())
392392
defer cancel()
393393

394394
ctrl.startWatches = []source.TypedSource[reconcile.Request]{}
395-
err := ctrl.startEventSources(ctx)
395+
err := ctrl.startEventSourcesLocked(ctx)
396396
Expect(err).NotTo(HaveOccurred())
397397
})
398398

@@ -409,7 +409,7 @@ var _ = Describe("controller", func() {
409409
// Set a sufficiently long timeout to avoid timeouts interfering with the error being returned
410410
ctrl.CacheSyncTimeout = 5 * time.Second
411411
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
412-
err := ctrl.startEventSources(ctx)
412+
err := ctrl.startEventSourcesLocked(ctx)
413413
Expect(err).To(Equal(expectedErr))
414414
})
415415

@@ -423,7 +423,7 @@ var _ = Describe("controller", func() {
423423
ctrl.Name = "test-controller"
424424
ctrl.CacheSyncTimeout = 5 * time.Second
425425

426-
err := ctrl.startEventSources(ctx)
426+
err := ctrl.startEventSourcesLocked(ctx)
427427
Expect(err).To(HaveOccurred())
428428
Expect(err.Error()).To(ContainSubstring("failed to wait for test-controller caches to sync"))
429429
})
@@ -439,7 +439,7 @@ var _ = Describe("controller", func() {
439439
ctrl.Name = "test-controller"
440440
ctrl.CacheSyncTimeout = 5 * time.Second
441441

442-
err := ctrl.startEventSources(ctx)
442+
err := ctrl.startEventSourcesLocked(ctx)
443443
Expect(err).NotTo(HaveOccurred())
444444
})
445445

@@ -463,7 +463,7 @@ var _ = Describe("controller", func() {
463463
startErrCh := make(chan error)
464464
go func() {
465465
defer GinkgoRecover()
466-
startErrCh <- ctrl.startEventSources(sourceCtx)
466+
startErrCh <- ctrl.startEventSourcesLocked(sourceCtx)
467467
}()
468468

469469
// Allow source to start successfully
@@ -498,7 +498,7 @@ var _ = Describe("controller", func() {
498498

499499
ctrl.startWatches = []source.TypedSource[reconcile.Request]{blockingSrc}
500500

501-
err := ctrl.startEventSources(ctx)
501+
err := ctrl.startEventSourcesLocked(ctx)
502502
Expect(err).To(HaveOccurred())
503503
Expect(err.Error()).To(ContainSubstring("timed out waiting for source"))
504504
})
@@ -517,13 +517,13 @@ var _ = Describe("controller", func() {
517517

518518
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
519519

520-
By("Calling startEventSources multiple times in parallel")
520+
By("Calling startEventSourcesLocked multiple times in parallel")
521521
var wg sync.WaitGroup
522522
for i := 1; i <= 5; i++ {
523523
wg.Add(1)
524524
go func() {
525525
defer wg.Done()
526-
err := ctrl.startEventSources(ctx)
526+
err := ctrl.startEventSourcesLocked(ctx)
527527
// All calls should return the same nil error
528528
Expect(err).NotTo(HaveOccurred())
529529
}()
@@ -533,12 +533,12 @@ var _ = Describe("controller", func() {
533533
Expect(startCount.Load()).To(Equal(int32(1)), "Source should only be started once even when called multiple times")
534534
})
535535

536-
It("should block subsequent calls from returning until the first call to startEventSources has returned", func() {
536+
It("should block subsequent calls from returning until the first call to startEventSourcesLocked has returned", func() {
537537
ctx, cancel := context.WithCancel(context.Background())
538538
defer cancel()
539539
ctrl.CacheSyncTimeout = 5 * time.Second
540540

541-
// finishSourceChan is closed to unblock startEventSources from returning
541+
// finishSourceChan is closed to unblock startEventSourcesLocked from returning
542542
finishSourceChan := make(chan struct{})
543543

544544
src := source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
@@ -547,28 +547,28 @@ var _ = Describe("controller", func() {
547547
})
548548
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
549549

550-
By("Calling startEventSources asynchronously")
550+
By("Calling startEventSourcesLocked asynchronously")
551551
go func() {
552552
defer GinkgoRecover()
553-
Expect(ctrl.startEventSources(ctx)).To(Succeed())
553+
Expect(ctrl.startEventSourcesLocked(ctx)).To(Succeed())
554554
}()
555555

556-
By("Calling startEventSources again")
556+
By("Calling startEventSourcesLocked again")
557557
var didSubsequentCallComplete atomic.Bool
558558
go func() {
559559
defer GinkgoRecover()
560-
Expect(ctrl.startEventSources(ctx)).To(Succeed())
560+
Expect(ctrl.startEventSourcesLocked(ctx)).To(Succeed())
561561
didSubsequentCallComplete.Store(true)
562562
}()
563563

564-
// Assert that second call to startEventSources is blocked while source has not finished
564+
// Assert that second call to startEventSourcesLocked is blocked while source has not finished
565565
Consistently(didSubsequentCallComplete.Load).Should(BeFalse())
566566

567567
By("Finishing source start + sync")
568568
finishSourceChan <- struct{}{}
569569

570-
// Assert that second call to startEventSources is now complete
571-
Eventually(didSubsequentCallComplete.Load).Should(BeTrue(), "startEventSources should complete after source is started and synced")
570+
// Assert that second call to startEventSourcesLocked is now complete
571+
Eventually(didSubsequentCallComplete.Load).Should(BeTrue(), "startEventSourcesLocked should complete after source is started and synced")
572572
})
573573

574574
It("should reset c.startWatches to nil after returning", func() {
@@ -583,7 +583,7 @@ var _ = Describe("controller", func() {
583583

584584
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
585585

586-
err := ctrl.startEventSources(ctx)
586+
err := ctrl.startEventSourcesLocked(ctx)
587587
Expect(err).NotTo(HaveOccurred())
588588
Expect(ctrl.startWatches).To(BeNil(), "startWatches should be reset to nil after returning")
589589
})
@@ -1236,6 +1236,31 @@ var _ = Describe("controller", func() {
12361236
Expect(<-runnableExecutionOrderChan).To(Equal(nonWarmupRunnableName))
12371237
})
12381238

1239+
It("should not cause a data race when called concurrently", func() {
1240+
ctx, cancel := context.WithCancel(context.Background())
1241+
defer cancel()
1242+
1243+
ctrl.CacheSyncTimeout = time.Second
1244+
1245+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
1246+
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1247+
return nil
1248+
}),
1249+
}
1250+
1251+
var wg sync.WaitGroup
1252+
for i := 0; i < 5; i++ {
1253+
wg.Add(1)
1254+
go func() {
1255+
defer GinkgoRecover()
1256+
defer wg.Done()
1257+
Expect(ctrl.Warmup(ctx)).To(Succeed())
1258+
}()
1259+
}
1260+
1261+
wg.Wait()
1262+
})
1263+
12391264
It("should not race with Start and only start sources once", func() {
12401265
ctx, cancel := context.WithCancel(context.Background())
12411266
defer cancel()

0 commit comments

Comments
 (0)