Skip to content

Commit 072ad4b

Browse files
committed
Add ready check to block controller startup until warmup is complete.
1 parent 854987c commit 072ad4b

File tree

4 files changed

+99
-96
lines changed

4 files changed

+99
-96
lines changed

pkg/internal/controller/controller.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ import (
2929
"k8s.io/apimachinery/pkg/types"
3030
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3131
"k8s.io/apimachinery/pkg/util/uuid"
32+
"k8s.io/apimachinery/pkg/util/wait"
3233
"k8s.io/client-go/util/workqueue"
34+
"k8s.io/utils/ptr"
3335

3436
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
3537
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
@@ -38,6 +40,11 @@ import (
3840
"sigs.k8s.io/controller-runtime/pkg/source"
3941
)
4042

43+
const (
44+
// syncedPollPeriod is the period to poll for cache sync
45+
syncedPollPeriod = 100 * time.Millisecond
46+
)
47+
4148
// Controller implements controller.Controller.
4249
type Controller[request comparable] struct {
4350
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
@@ -86,6 +93,13 @@ type Controller[request comparable] struct {
8693
// didStartEventSources is used to indicate whether the event sources have been started.
8794
didStartEventSources atomic.Bool
8895

96+
// didEventSourcesFinishSync is used to indicate whether the event sources have finished
97+
// successfully. It stores a *bool where
98+
// - nil: not finished syncing
99+
// - true: finished syncing without error
100+
// - false: finished syncing with error
101+
didEventSourcesFinishSync atomic.Value
102+
89103
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
90104
// or for example when a watch is started.
91105
// Note: LogConstructor has to be able to handle nil requests as we are also using it
@@ -156,12 +170,37 @@ func (c *Controller[request]) NeedLeaderElection() bool {
156170
// Warmup implements the manager.WarmupRunnable interface.
157171
func (c *Controller[request]) Warmup(ctx context.Context) error {
158172
if c.NeedWarmup == nil || !*c.NeedWarmup {
173+
c.didEventSourcesFinishSync.Store(ptr.To(true))
159174
return nil
160175
}
161176

162177
return c.startEventSources(ctx)
163178
}
164179

180+
// DidFinishWarmup implements the manager.WarmupRunnable interface.
181+
func (c *Controller[request]) DidFinishWarmup(ctx context.Context) bool {
182+
err := wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(ctx context.Context) (bool, error) {
183+
didFinishSync, ok := c.didEventSourcesFinishSync.Load().(*bool)
184+
if !ok {
185+
return false, errors.New("unexpected error: didEventSourcesFinishSync is not a bool pointer")
186+
}
187+
188+
if didFinishSync == nil {
189+
// event sources not finished syncing
190+
return false, nil
191+
}
192+
193+
if !*didFinishSync {
194+
// event sources finished syncing with an error
195+
return true, errors.New("event sources did not finish syncing succesfully")
196+
}
197+
198+
return true, nil
199+
})
200+
201+
return err == nil
202+
}
203+
165204
// Start implements controller.Controller.
166205
func (c *Controller[request]) Start(ctx context.Context) error {
167206
// use an IIFE to get proper lock handling
@@ -299,7 +338,11 @@ func (c *Controller[request]) startEventSources(ctx context.Context) error {
299338
}
300339
})
301340
}
302-
return errGroup.Wait()
341+
err := errGroup.Wait()
342+
343+
c.didEventSourcesFinishSync.Store(ptr.To(err == nil))
344+
345+
return err
303346
}
304347

305348
// processNextWorkItem will read a single work item off the workqueue and

pkg/internal/controller/controller_test.go

Lines changed: 44 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,126 +1016,78 @@ var _ = Describe("controller", func() {
10161016
})
10171017

10181018
Describe("Warmup", func() {
1019-
It("should start event sources when NeedWarmup is true", func() {
1020-
// Setup
1021-
ctx, cancel := context.WithCancel(context.Background())
1022-
defer cancel()
1023-
1024-
// Create a mock source that we can verify was started
1025-
sourceStarted := false
1026-
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1027-
sourceStarted = true
1028-
return nil
1029-
})
1030-
1031-
ctrl.CacheSyncTimeout = time.Second
1032-
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
1019+
JustBeforeEach(func() {
10331020
ctrl.NeedWarmup = ptr.To(true)
1034-
1035-
// Act
1036-
err := ctrl.Warmup(ctx)
1037-
1038-
// Assert
1039-
Expect(err).NotTo(HaveOccurred())
1040-
Expect(sourceStarted).To(BeTrue(), "Event source should have been started")
1041-
Expect(ctrl.didStartEventSources.Load()).To(BeTrue(), "didStartEventSources flag should be set")
10421021
})
10431022

1044-
It("should not start event sources when NeedWarmup is false", func() {
1045-
// Setup
1023+
It("should track warmup status correctly with successful sync", func() {
1024+
// Setup controller with sources that complete successfully
10461025
ctx, cancel := context.WithCancel(context.Background())
10471026
defer cancel()
10481027

1049-
// Create a mock source that should not be started
1050-
sourceStarted := false
1051-
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1052-
sourceStarted = true
1053-
return nil
1054-
})
1055-
1056-
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
1057-
ctrl.NeedWarmup = ptr.To(false)
1028+
ctrl.CacheSyncTimeout = time.Second
1029+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
1030+
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1031+
return nil
1032+
}),
1033+
}
10581034

1059-
// Act
10601035
err := ctrl.Warmup(ctx)
1061-
1062-
// Assert
10631036
Expect(err).NotTo(HaveOccurred())
1064-
Expect(sourceStarted).To(BeFalse(), "Event source should not have been started")
1065-
Expect(ctrl.didStartEventSources.Load()).To(BeFalse(), "didStartEventSources flag should not be set")
1037+
1038+
// Verify DidFinishWarmup returns true for successful sync
1039+
result := ctrl.DidFinishWarmup(ctx)
1040+
Expect(result).To(BeTrue())
10661041
})
10671042

1068-
It("should not start event sources when NeedWarmup is nil", func() {
1069-
// Setup
1043+
It("should track warmup status correctly with unsuccessful sync", func() {
1044+
// Setup controller with sources that complete with error
10701045
ctx, cancel := context.WithCancel(context.Background())
10711046
defer cancel()
10721047

1073-
// Create a mock source that should not be started
1074-
sourceStarted := false
1075-
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1076-
sourceStarted = true
1077-
return nil
1078-
})
1079-
1080-
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
1081-
ctrl.NeedWarmup = nil
1048+
ctrl.CacheSyncTimeout = time.Second
1049+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
1050+
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1051+
return errors.New("sync error")
1052+
}),
1053+
}
10821054

1083-
// Act
10841055
err := ctrl.Warmup(ctx)
1056+
Expect(err).To(HaveOccurred())
1057+
Expect(err.Error()).To(ContainSubstring("sync error"))
10851058

1086-
// Assert
1087-
Expect(err).NotTo(HaveOccurred())
1088-
Expect(sourceStarted).To(BeFalse(), "Event source should not have been started")
1089-
Expect(ctrl.didStartEventSources.Load()).To(BeFalse(), "didStartEventSources flag should not be set")
1059+
// Verify DidFinishWarmup returns false for unsuccessful sync
1060+
result := ctrl.DidFinishWarmup(ctx)
1061+
Expect(result).To(BeFalse())
10901062
})
1063+
})
10911064

1092-
It("should not start event sources twice when called multiple times", func() {
1093-
// Setup
1094-
ctx, cancel := context.WithCancel(context.Background())
1095-
defer cancel()
1096-
1097-
// Create a mock source that counts how many times it's started
1098-
startCount := 0
1099-
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1100-
startCount++
1101-
return nil
1102-
})
1103-
1104-
ctrl.CacheSyncTimeout = time.Second
1105-
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
1106-
ctrl.NeedWarmup = ptr.To(true)
1107-
1108-
// Act
1109-
err1 := ctrl.Warmup(ctx)
1110-
err2 := ctrl.Warmup(ctx)
1111-
1112-
// Assert
1113-
Expect(err1).NotTo(HaveOccurred())
1114-
Expect(err2).NotTo(HaveOccurred())
1115-
Expect(startCount).To(Equal(1), "Event source should have been started only once")
1116-
Expect(ctrl.didStartEventSources.Load()).To(BeTrue(), "didStartEventSources flag should be set")
1065+
Describe("Warmup without warmup enabled", func() {
1066+
JustBeforeEach(func() {
1067+
ctrl.NeedWarmup = ptr.To(false)
11171068
})
11181069

1119-
It("should propagate errors from event sources", func() {
1120-
// Setup
1070+
It("should not start sources if warmup is disabled.", func() {
1071+
// Setup controller with sources that complete successfully
11211072
ctx, cancel := context.WithCancel(context.Background())
11221073
defer cancel()
11231074

1124-
// Create a mock source that returns an error
1125-
expectedErr := errors.New("test error")
1126-
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1127-
return expectedErr
1128-
})
1129-
11301075
ctrl.CacheSyncTimeout = time.Second
1131-
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
1132-
ctrl.NeedWarmup = ptr.To(true)
1076+
isSourceStarted := false
1077+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
1078+
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1079+
isSourceStarted = true
1080+
return nil
1081+
}),
1082+
}
11331083

1134-
// Act
11351084
err := ctrl.Warmup(ctx)
1085+
Expect(err).NotTo(HaveOccurred())
1086+
1087+
result := ctrl.DidFinishWarmup(ctx)
1088+
Expect(result).To(BeTrue())
11361089

1137-
// Assert
1138-
Expect(err).To(MatchError(expectedErr))
1090+
Expect(isSourceStarted).To(BeFalse())
11391091
})
11401092
})
11411093
})

pkg/manager/manager.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,10 +316,13 @@ type LeaderElectionRunnable interface {
316316

317317
// WarmupRunnable knows if a Runnable should be a warmup runnable. A warmup runnable is a runnable
318318
// that should be run when the manager is started, but before the leader election is acquired.
319-
// The expectation is that it will block startup until the warmup runnables have completed.
320319
type WarmupRunnable interface {
321-
// Warmup is called when the manager is started, but before the leader election is acquired.
320+
// Warmup is the implementation of the warmup runnable.
322321
Warmup(context.Context) error
322+
323+
// WaitForWarmupComplete is a blocking function that waits for the warmup to be completed. It
324+
// returns false if it could not successfully finish warmup.
325+
WaitForWarmupComplete(context.Context) bool
323326
}
324327

325328
// New returns a new Manager for creating Controllers.

pkg/manager/runnable_group.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,12 @@ func (r *runnables) Add(fn Runnable) error {
6969
return r.Webhooks.Add(fn, nil)
7070
case WarmupRunnable, LeaderElectionRunnable:
7171
if warmupRunnable, ok := fn.(WarmupRunnable); ok {
72-
if err := r.Warmup.Add(RunnableFunc(warmupRunnable.Warmup), nil); err != nil {
72+
if err := r.Warmup.Add(
73+
RunnableFunc(warmupRunnable.Warmup),
74+
func(ctx context.Context) bool {
75+
return warmupRunnable.WaitForWarmupComplete(ctx)
76+
},
77+
); err != nil {
7378
return err
7479
}
7580
}

0 commit comments

Comments
 (0)