Skip to content

Commit efb3ed1

Browse files
committed
🌱 Propagate context from controller.Start to handler
Signed-off-by: Vince Prignano <vincepri@vmware.com>
1 parent 97cfffd commit efb3ed1

File tree

1 file changed

+14
-13
lines changed

1 file changed

+14
-13
lines changed

pkg/internal/controller/controller.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ func (c *Controller) Start(stop <-chan struct{}) error {
130130
c.Queue = c.MakeQueue()
131131
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
132132

133+
ctx, cancel := context.WithCancel(context.Background())
134+
defer cancel()
135+
133136
err := func() error {
134137
defer c.mu.Unlock()
135138

@@ -170,8 +173,12 @@ func (c *Controller) Start(stop <-chan struct{}) error {
170173
// Launch workers to process resources
171174
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
172175
for i := 0; i < c.MaxConcurrentReconciles; i++ {
173-
// Process work items
174-
go wait.Until(c.worker, c.JitterPeriod, stop)
176+
// Run a worker thread that just dequeues items, processes them, and marks them done.
177+
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
178+
go wait.Until(func() {
179+
for c.processNextWorkItem(ctx) {
180+
}
181+
}, c.JitterPeriod, stop)
175182
}
176183

177184
c.Started = true
@@ -180,22 +187,16 @@ func (c *Controller) Start(stop <-chan struct{}) error {
180187
if err != nil {
181188
return err
182189
}
190+
context.WithCancel(context.Background())
183191

184192
<-stop
185193
c.Log.Info("Stopping workers")
186194
return nil
187195
}
188196

189-
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
190-
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
191-
func (c *Controller) worker() {
192-
for c.processNextWorkItem() {
193-
}
194-
}
195-
196197
// processNextWorkItem will read a single work item off the workqueue and
197198
// attempt to process it, by calling the reconcileHandler.
198-
func (c *Controller) processNextWorkItem() bool {
199+
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
199200
obj, shutdown := c.Queue.Get()
200201
if shutdown {
201202
// Stop working
@@ -210,10 +211,10 @@ func (c *Controller) processNextWorkItem() bool {
210211
// period.
211212
defer c.Queue.Done(obj)
212213

213-
return c.reconcileHandler(obj)
214+
return c.reconcileHandler(ctx, obj)
214215
}
215216

216-
func (c *Controller) reconcileHandler(obj interface{}) bool {
217+
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) bool {
217218
// Update metrics after processing each item
218219
reconcileStartTS := time.Now()
219220
defer func() {
@@ -233,7 +234,7 @@ func (c *Controller) reconcileHandler(obj interface{}) bool {
233234
}
234235

235236
log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
236-
ctx := logf.IntoContext(context.Background(), log)
237+
ctx = logf.IntoContext(ctx, log)
237238

238239
// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
239240
// resource to be synced.

0 commit comments

Comments
 (0)