Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec
return informer, nil
}

c.InformersByGVK[gvk] = &controllertest.FakeInformer{}
// Set Synced to true by default so that WaitForCacheSync returns immediately
c.InformersByGVK[gvk] = &controllertest.FakeInformer{Synced: true}
return c.InformersByGVK[gvk], nil
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/controller/controllertest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ type FakeInformer struct {
handlers []cache.ResourceEventHandler
}

// fakeHandlerRegistration implements cache.ResourceEventHandlerRegistration for testing.
type fakeHandlerRegistration struct {
informer *FakeInformer
}

// HasSynced implements cache.ResourceEventHandlerRegistration.
func (f *fakeHandlerRegistration) HasSynced() bool {
return f.informer.Synced
}

// AddIndexers does nothing. TODO(community): Implement this.
func (f *FakeInformer) AddIndexers(indexers cache.Indexers) error {
return nil
Expand All @@ -60,19 +70,19 @@ func (f *FakeInformer) HasSynced() bool {
// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, handler)
return nil, nil
return &fakeHandlerRegistration{informer: f}, nil
}

// AddEventHandlerWithResyncPeriod implements the Informer interface. Adds an EventHandler to the fake Informers (ignores resyncPeriod). TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, handler)
return nil, nil
return &fakeHandlerRegistration{informer: f}, nil
}

// AddEventHandlerWithOptions implements the Informer interface. Adds an EventHandler to the fake Informers (ignores options). TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandlerWithOptions(handler cache.ResourceEventHandler, _ cache.HandlerOptions) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, handler)
return nil, nil
return &fakeHandlerRegistration{informer: f}, nil
}

// Run implements the Informer interface. Increments f.RunCount.
Expand Down
91 changes: 91 additions & 0 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -386,6 +387,96 @@ var _ = Describe("controller", func() {
<-sourceSynced
})

It("should not call Reconcile until all event handlers have processed initial objects", func(specCtx SpecContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simler version of the test below, main changes:

  • Start cache right at the beginning so it syncs in the background
  • The firstPod handling doesn't seem neccessary
  • Its enough to test inside reconcile that the handler processed everything to validate the ordering, we don't need to also test on the outside that reconcile was called after the handler finished
		It("should not call Reconcile until all event handlers have processed initial objects", func(specCtx SpecContext) {
			nPods := 20
			pods := make([]*corev1.Pod, nPods)
			for i := range nPods {
				pods[i] = &corev1.Pod{
					ObjectMeta: metav1.ObjectMeta{
						Name:      strconv.Itoa(i),
						Namespace: "default",
					},
					Spec: corev1.PodSpec{
						Containers: []corev1.Container{{Name: "test", Image: "test"}},
					},
				}
				_, err := clientset.CoreV1().Pods("default").Create(specCtx, pods[i], metav1.CreateOptions{})
				Expect(err).NotTo(HaveOccurred())
			}
			defer func() {
				for _, pod := range pods {
					_ = clientset.CoreV1().Pods("default").Delete(specCtx, pod.Name, metav1.DeleteOptions{})
				}
			}()

			testCache, err := cache.New(cfg, cache.Options{})
			Expect(err).NotTo(HaveOccurred())

			ctx, cancel := context.WithCancel(specCtx)
			defer cancel()
			go func() {
				defer GinkgoRecover()
				_ = testCache.Start(ctx)
			}()

			// Tracks how many objects have been processed by the event handler.
			var handlerProcessedCount atomic.Int32

			// Channel to block one of the event handlers to simulate slow event handler processing.
			blockHandler := make(chan struct{})

			// Tracks whether Reconcile was called.
			var reconcileCalled atomic.Bool

			// Create the controller.
			testCtrl := New(Options[reconcile.Request]{
				MaxConcurrentReconciles: 1,
				CacheSyncTimeout:        10 * time.Second,
				NewQueue: func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
					return &controllertest.Queue{
						TypedInterface: workqueue.NewTyped[reconcile.Request](),
					}
				},
				Name: "test-reconcile-order",
				LogConstructor: func(_ *reconcile.Request) logr.Logger {
					return log.RuntimeLog.WithName("test-reconcile-order")
				},
				Do: reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
					// handlerProcessedCount should be equal to the number of pods created since we are waiting
					// for the handlers to finish processing before reconciling.
					Expect(handlerProcessedCount.Load()).To(Equal(int32(nPods)))
					reconcileCalled.Store(true)
					return reconcile.Result{}, nil
				})},
			)

			// Watch pods with an event handler that blocks all pods but the first one in the list.
			// Kind sources wait for handler sync to ensure that Reconcile is not called until all
			// initial objects have been processed by the event handlers.
			err = testCtrl.Watch(source.Kind(testCache, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{
				CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
					<-blockHandler
					handlerProcessedCount.Add(1)
					q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace()}})
				},
			}))
			Expect(err).NotTo(HaveOccurred())

			controllerDone := make(chan error)
			go func() {
				defer GinkgoRecover()
				controllerDone <- testCtrl.Start(ctx)
			}()

			// Give the controller time to start the reconciler. We asserts
			// in there that all events have been processed, so if we start it
			// prematurely, that assertion will fail. We can not get rid of the
			// sleep unless we stop using envtest for this test.
			time.Sleep(1 * time.Second)
			close(blockHandler)
			Eventually(reconcileCalled.Load).Should(BeTrue())

			cancel()
			Eventually(controllerDone, 5*time.Second).Should(Receive())
		})

nPods := 20
pods := make([]*corev1.Pod, nPods)
for i := range nPods {
pods[i] = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: strconv.Itoa(i),
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "test", Image: "test"}},
},
}
_, err := clientset.CoreV1().Pods("default").Create(specCtx, pods[i], metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
}
defer func() {
for _, pod := range pods {
_ = clientset.CoreV1().Pods("default").Delete(specCtx, pod.Name, metav1.DeleteOptions{})
}
}()

testCache, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(specCtx)
defer cancel()
go func() {
defer GinkgoRecover()
_ = testCache.Start(ctx)
}()

// Tracks how many objects have been processed by the event handler.
var handlerProcessedCount atomic.Int32

// Channel to block one of the event handlers to simulate slow event handler processing.
blockHandler := make(chan struct{})

// Tracks whether Reconcile was called.
var reconcileCalled atomic.Bool

// Create the controller.
testCtrl := New(Options[reconcile.Request]{
MaxConcurrentReconciles: 1,
CacheSyncTimeout: 10 * time.Second,
NewQueue: func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
return &controllertest.Queue{
TypedInterface: workqueue.NewTyped[reconcile.Request](),
}
},
Name: "test-reconcile-order",
LogConstructor: func(_ *reconcile.Request) logr.Logger {
return log.RuntimeLog.WithName("test-reconcile-order")
},
Do: reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
// handlerProcessedCount should be equal to the number of pods created since we are waiting
// for the handlers to finish processing before reconciling.
Expect(handlerProcessedCount.Load()).To(Equal(int32(nPods)))
reconcileCalled.Store(true)
return reconcile.Result{}, nil
})},
)

err = testCtrl.Watch(source.Kind(testCache, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{
CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
<-blockHandler
handlerProcessedCount.Add(1)
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace()}})
},
}))
Expect(err).NotTo(HaveOccurred())

controllerDone := make(chan error)
go func() {
defer GinkgoRecover()
controllerDone <- testCtrl.Start(ctx)
}()

// Give the controller time to start the reconciler. We asserts
// in there that all events have been processed, so if we start it
// prematurely, that assertion will fail. We can not get rid of the
// sleep unless we stop using envtest for this test.
time.Sleep(1 * time.Second)
close(blockHandler)
Eventually(reconcileCalled.Load).Should(BeTrue())

cancel()
Eventually(controllerDone, 5*time.Second).Should(Receive())
})

It("should process events from source.Channel", func(ctx SpecContext) {
ctrl.CacheSyncTimeout = 10 * time.Second
// channel to be closed when event is processed
Expand Down
11 changes: 9 additions & 2 deletions pkg/internal/source/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,23 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type
return
}

_, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{
handlerRegistration, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{
Logger: &logKind,
})
if err != nil {
ks.startedErr <- err
return
}
// First, wait for the cache to sync. For real caches this waits for startup.
// For fakes with Synced=false, this returns immediately allowing fast failure.
if !ks.Cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.startedErr <- errors.New("cache did not sync")
close(ks.startedErr)
return
}
// Then wait for this specific handler to receive all initial events.
if !toolscache.WaitForCacheSync(ctx.Done(), handlerRegistration.HasSynced) {
ks.startedErr <- errors.New("handler did not sync")
}
close(ks.startedErr)
}()
Expand Down
Loading