Skip to content

Commit

Permalink
Merge pull request #2046 from FillZpp/add-return-registration-into-ev…
Browse files Browse the repository at this point in the history
…ent-handler

⚠ Support registration and removal of event handler
  • Loading branch information
k8s-ci-robot authored Nov 16, 2022
2 parents 50077a4 + 6d2d247 commit af8d903
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 48 deletions.
12 changes: 10 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,19 @@ type Informer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
AddEventHandler(handler toolscache.ResourceEventHandler)
// It returns a registration handle for the handler that can be used to remove
// the handler again.
AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error)
// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
// specified resync period. Events to a single handler are delivered sequentially, but there is
// no coordination between different handlers.
AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration)
// It returns a registration handle for the handler that can be used to remove
// the handler again and an error if the handler cannot be added.
AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error)
// RemoveEventHandler removes a formerly added event handler given by
// its registration handle.
// This function is guaranteed to be idempotent, and thread-safe.
RemoveEventHandler(handle toolscache.ResourceEventHandlerRegistration) error
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(indexers toolscache.Indexers) error
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
addFunc := func(obj interface{}) {
out <- obj
}
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})
_, _ = sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})

By("adding an object")
cl, err := client.New(cfg, client.Options{})
Expand All @@ -1369,7 +1369,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
addFunc := func(obj interface{}) {
out <- obj
}
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})
_, _ = sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})

By("adding an object")
cl, err := client.New(cfg, client.Options{})
Expand Down Expand Up @@ -1528,7 +1528,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
addFunc := func(obj interface{}) {
out <- obj
}
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})
_, _ = sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})

By("adding an object")
cl, err := client.New(cfg, client.Options{})
Expand Down Expand Up @@ -1646,7 +1646,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
addFunc := func(obj interface{}) {
out <- obj
}
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})
_, _ = sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})

By("adding an object")
cl, err := client.New(cfg, client.Options{})
Expand Down
22 changes: 2 additions & 20 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"reflect"
"strings"
"time"

apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -142,7 +141,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou
if err != nil {
return nil, err
}
return WrapInformer(i.Informer), err
return i.Informer, err
}

// GetInformer returns the informer for the obj.
Expand All @@ -156,7 +155,7 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In
if err != nil {
return nil, err
}
return WrapInformer(i.Informer), err
return i.Informer, err
}

// NeedLeaderElection implements the LeaderElectionRunnable interface
Expand Down Expand Up @@ -216,20 +215,3 @@ func indexByField(indexer Informer, field string, extractor client.IndexerFunc)

return indexer.AddIndexers(cache.Indexers{internal.FieldIndexName(field): indexFunc})
}

type informerWrapper struct {
cache.SharedIndexInformer
}

func (iw *informerWrapper) AddEventHandler(handler cache.ResourceEventHandler) {
_, _ = iw.SharedIndexInformer.AddEventHandler(handler)
}

func (iw *informerWrapper) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) {
_, _ = iw.SharedIndexInformer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
}

// WrapInformer is a temporary wrapper to make Informer compatible with the SharedIndexInformer in client-go v0.26.0
func WrapInformer(i cache.SharedIndexInformer) Informer {
return &informerWrapper{SharedIndexInformer: i}
}
12 changes: 2 additions & 10 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.Group
if err != nil {
return nil, err
}
i, err := c.informerFor(gvk, obj)
if err != nil {
return nil, err
}
return cache.WrapInformer(i), nil
return c.informerFor(gvk, obj)
}

// FakeInformerForKind implements Informers.
Expand Down Expand Up @@ -80,11 +76,7 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac
return nil, err
}
gvk := gvks[0]
i, err := c.informerFor(gvk, obj)
if err != nil {
return nil, err
}
return cache.WrapInformer(i), nil
return c.informerFor(gvk, obj)
}

// WaitForCacheSync implements Informers.
Expand Down
42 changes: 36 additions & 6 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,47 @@ type multiNamespaceInformer struct {
var _ Informer = &multiNamespaceInformer{}

// AddEventHandler adds the handler to each namespaced informer.
func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) {
for _, informer := range i.namespaceToInformer {
informer.AddEventHandler(handler)
func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error) {
handles := make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer))
for ns, informer := range i.namespaceToInformer {
registration, err := informer.AddEventHandler(handler)
if err != nil {
return nil, err
}
handles[ns] = registration
}
return handles, nil
}

// AddEventHandlerWithResyncPeriod adds the handler with a resync period to each namespaced informer.
func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) {
for _, informer := range i.namespaceToInformer {
informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) {
handles := make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer))
for ns, informer := range i.namespaceToInformer {
registration, err := informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
if err != nil {
return nil, err
}
handles[ns] = registration
}
return handles, nil
}

// RemoveEventHandler removes a formerly added event handler given by its registration handle.
func (i *multiNamespaceInformer) RemoveEventHandler(h toolscache.ResourceEventHandlerRegistration) error {
handles, ok := h.(map[string]toolscache.ResourceEventHandlerRegistration)
if !ok {
return fmt.Errorf("it is not the registration returned by multiNamespaceInformer")
}
for ns, informer := range i.namespaceToInformer {
registration, ok := handles[ns]
if !ok {
continue
}
if err := informer.RemoveEventHandler(registration); err != nil {
return err
}
}
return nil
}

// AddIndexers adds the indexer for each namespaced informer.
Expand Down
11 changes: 9 additions & 2 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return
}

i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
_, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if err != nil {
ks.started <- err
return
}
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
Expand Down Expand Up @@ -351,7 +355,10 @@ func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, que
return fmt.Errorf("must specify Informer.Informer")
}

is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
_, err := is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if err != nil {
return err
}
return nil
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/source/source_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"time"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -245,7 +244,7 @@ var _ = Describe("Source", func() {
c := make(chan struct{})

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{Informer: cache.WrapInformer(depInformer)}
instance := &source.Informer{Informer: depInformer}
err := instance.Start(ctx, handler.Funcs{
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Expand Down Expand Up @@ -286,7 +285,7 @@ var _ = Describe("Source", func() {
rs2.SetLabels(map[string]string{"biz": "baz"})

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{Informer: cache.WrapInformer(depInformer)}
instance := &source.Informer{Informer: depInformer}
err = instance.Start(ctx, handler.Funcs{
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
},
Expand Down Expand Up @@ -323,7 +322,7 @@ var _ = Describe("Source", func() {
c := make(chan struct{})

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{Informer: cache.WrapInformer(depInformer)}
instance := &source.Informer{Informer: depInformer}
err := instance.Start(ctx, handler.Funcs{
CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) {
},
Expand Down

0 comments on commit af8d903

Please sign in to comment.