Skip to content

[release-0.20] ✨ Controller: Retain the priority #3173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/util/workqueue"

"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -60,7 +61,7 @@ type Controller[request comparable] struct {

// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Queue workqueue.TypedRateLimitingInterface[request]
Queue priorityqueue.PriorityQueue[request]

// mu is used to synchronize Controller setup
mu sync.Mutex
Expand Down Expand Up @@ -157,7 +158,12 @@ func (c *Controller[request]) Start(ctx context.Context) error {
// Set the internal context.
c.ctx = ctx

c.Queue = c.NewQueue(c.Name, c.RateLimiter)
queue := c.NewQueue(c.Name, c.RateLimiter)
if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue {
c.Queue = priorityQueue
} else {
c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue}
}
go func() {
<-ctx.Done()
c.Queue.ShutDown()
Expand Down Expand Up @@ -268,7 +274,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the reconcileHandler.
func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
obj, priority, shutdown := c.Queue.GetWithPriority()
if shutdown {
// Stop working
return false
Expand All @@ -285,7 +291,7 @@ func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool {
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)

c.reconcileHandler(ctx, obj)
c.reconcileHandler(ctx, obj, priority)
return true
}

Expand All @@ -308,7 +314,7 @@ func (c *Controller[request]) initMetrics() {
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Set(0)
}

func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) {
func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, priority int) {
// Update metrics after processing each item
reconcileStartTS := time.Now()
defer func() {
Expand All @@ -331,7 +337,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request)
if errors.Is(err, reconcile.TerminalError(nil)) {
ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
} else {
c.Queue.AddRateLimited(req)
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req)
}
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
Expand All @@ -346,11 +352,11 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request)
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
c.Queue.Forget(req)
c.Queue.AddAfter(req, result.RequeueAfter)
c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: priority}, req)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
case result.Requeue:
log.V(5).Info("Reconcile done, requeueing")
c.Queue.AddRateLimited(req)
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
default:
log.V(5).Info("Reconcile successful")
Expand Down Expand Up @@ -388,3 +394,25 @@ type reconcileIDKey struct{}
func addReconcileID(ctx context.Context, reconcileID types.UID) context.Context {
return context.WithValue(ctx, reconcileIDKey{}, reconcileID)
}

type priorityQueueWrapper[request comparable] struct {
workqueue.TypedRateLimitingInterface[request]
}

func (p *priorityQueueWrapper[request]) AddWithOpts(opts priorityqueue.AddOpts, items ...request) {
for _, item := range items {
switch {
case opts.RateLimited:
p.TypedRateLimitingInterface.AddRateLimited(item)
case opts.After > 0:
p.TypedRateLimitingInterface.AddAfter(item, opts.After)
default:
p.TypedRateLimitingInterface.Add(item)
}
}
}

func (p *priorityQueueWrapper[request]) GetWithPriority() (request, int, bool) {
item, shutdown := p.TypedRateLimitingInterface.Get()
return item, 0, shutdown
}
123 changes: 116 additions & 7 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
Expand Down Expand Up @@ -345,9 +346,10 @@ var _ = Describe("controller", func() {
})

It("should check for correct TypedSyncingSource if custom types are used", func() {
queue := &controllertest.TypedQueue[TestRequest]{
TypedInterface: workqueue.NewTyped[TestRequest](),
}
queue := &priorityQueueWrapper[TestRequest]{
TypedRateLimitingInterface: &controllertest.TypedQueue[TestRequest]{
TypedInterface: workqueue.NewTyped[TestRequest](),
}}
ctrl := &Controller[TestRequest]{
NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] {
return queue
Expand Down Expand Up @@ -400,10 +402,6 @@ var _ = Describe("controller", func() {
Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0))
})

PIt("should forget an item if it is not a Request and continue processing items", func() {
// TODO(community): write this test
})

It("should requeue a Request if there is an error and continue processing items", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -523,6 +521,37 @@ var _ = Describe("controller", func() {
Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
})

It("should retain the priority when the reconciler requests a requeue", func() {
q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")}
ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
return q
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
}()

q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request)

By("Invoking Reconciler which will request a requeue")
fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil)
Expect(<-reconciled).To(Equal(request))
Eventually(func() []priorityQueueAddition {
q.lock.Lock()
defer q.lock.Unlock()
return q.added
}).Should(Equal([]priorityQueueAddition{{
AddOpts: priorityqueue.AddOpts{
RateLimited: true,
Priority: 10,
},
items: []reconcile.Request{request},
}}))
})

It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() {
dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)}
ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
Expand Down Expand Up @@ -555,6 +584,37 @@ var _ = Describe("controller", func() {
Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
})

It("should retain the priority with RequeAfter", func() {
q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")}
ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
return q
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
}()

q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request)

By("Invoking Reconciler which will ask for RequeueAfter")
fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, nil)
Expect(<-reconciled).To(Equal(request))
Eventually(func() []priorityQueueAddition {
q.lock.Lock()
defer q.lock.Unlock()
return q.added
}).Should(Equal([]priorityQueueAddition{{
AddOpts: priorityqueue.AddOpts{
After: time.Millisecond * 100,
Priority: 10,
},
items: []reconcile.Request{request},
}}))
})

It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)}
ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
Expand Down Expand Up @@ -586,6 +646,37 @@ var _ = Describe("controller", func() {
Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
})

It("should retain the priority when there was an error", func() {
q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")}
ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
return q
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
}()

q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request)

By("Invoking Reconciler which will return an error")
fakeReconcile.AddResult(reconcile.Result{}, errors.New("oups, I did it again"))
Expect(<-reconciled).To(Equal(request))
Eventually(func() []priorityQueueAddition {
q.lock.Lock()
defer q.lock.Unlock()
return q.added
}).Should(Equal([]priorityQueueAddition{{
AddOpts: priorityqueue.AddOpts{
RateLimited: true,
Priority: 10,
},
items: []reconcile.Request{request},
}}))
})

PIt("should return if the queue is shutdown", func() {
// TODO(community): write this test
})
Expand Down Expand Up @@ -977,3 +1068,21 @@ func (t *bisignallingSource[T]) WaitForSync(ctx context.Context) error {
return ctx.Err()
}
}

type priorityQueueAddition struct {
priorityqueue.AddOpts
items []reconcile.Request
}

type fakePriorityQueue struct {
priorityqueue.PriorityQueue[reconcile.Request]

lock sync.Mutex
added []priorityQueueAddition
}

func (f *fakePriorityQueue) AddWithOpts(o priorityqueue.AddOpts, items ...reconcile.Request) {
f.lock.Lock()
defer f.lock.Unlock()
f.added = append(f.added, priorityQueueAddition{AddOpts: o, items: items})
}