Skip to content

Conditional controllers with event handler removal #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ module sigs.k8s.io/controller-runtime

go 1.16

replace k8s.io/api => /usr/local/google/home/kevindelgado/.gvm/pkgsets/go1.16/global/src/k8s.io/kubernetes/staging/src/k8s.io/api

replace k8s.io/apimachinery => /usr/local/google/home/kevindelgado/.gvm/pkgsets/go1.16/global/src/k8s.io/kubernetes/staging/src/k8s.io/apimachinery

replace k8s.io/client-go => /usr/local/google/home/kevindelgado/.gvm/pkgsets/go1.16/global/src/k8s.io/kubernetes/staging/src/k8s.io/client-go

require (
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/fsnotify/fsnotify v1.4.9
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -794,15 +794,9 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3 h1:sXmLre5bzIR6ypkjXCDI3jHPssRhc8KD/Ome589sc3U=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.21.0 h1:gu5iGF4V6tfVCQ/R+8Hc0h7H1JuEhzyEi9S4R5LM8+Y=
k8s.io/api v0.21.0/go.mod h1:+YbrhBBGgsxbF6o6Kj4KJPJnBmAKuXDeS3E18bgHNVU=
k8s.io/apiextensions-apiserver v0.21.0 h1:Nd4uBuweg6ImzbxkC1W7xUNZcCV/8Vt10iTdTIVF3hw=
k8s.io/apiextensions-apiserver v0.21.0/go.mod h1:gsQGNtGkc/YoDG9loKI0V+oLZM4ljRPjc/sql5tmvzc=
k8s.io/apimachinery v0.21.0 h1:3Fx+41if+IRavNcKOz09FwEXDBG6ORh6iMsTSelhkMA=
k8s.io/apimachinery v0.21.0/go.mod h1:jbreFvJo3ov9rj7eWT7+sYiRx+qZuCYXwWT1bcDswPY=
k8s.io/apiserver v0.21.0/go.mod h1:w2YSn4/WIwYuxG5zJmcqtRdtqgW/J2JRgFAqps3bBpg=
k8s.io/client-go v0.21.0 h1:n0zzzJsAQmJngpC0IhgFcApZyoGXPrDIAD601HD09ag=
k8s.io/client-go v0.21.0/go.mod h1:nNBytTF9qPFDEhoqgEPaarobC8QPae13bElIVHzIglA=
k8s.io/code-generator v0.21.0/go.mod h1:hUlps5+9QaTrKx+jiM4rmq7YmH8wPOIko64uZCHDh6Q=
k8s.io/component-base v0.21.0 h1:tLLGp4BBjQaCpS/KiuWh7m2xqvAdsxLm4ATxHSe5Zpg=
k8s.io/component-base v0.21.0/go.mod h1:qvtjz6X0USWXbgmbfXR+Agik4RZ3jv2Bgr5QnZzdPYw=
Expand Down
6 changes: 6 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,17 @@ type Informer interface {
// specified resync period. Events to a single handler are delivered sequentially, but there is
// no coordination between different handlers.
AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration)
// RemoveEventHandler removes an event handler that was previously added.
// Only event handlers that are go comparable can be removed, meaning handlers such as
// ResourceEventHandlerFuncs can only be removed if they are added by reference rather than by value.
RemoveEventHandler(handler toolscache.ResourceEventHandler) error
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(indexers toolscache.Indexers) error
//HasSynced return true if the informers underlying store has synced
HasSynced() bool
// IsStopped reports whether the informer has already been stopped.
IsStopped() bool
}

// Options are the optional arguments for creating a new InformersMap object
Expand Down
23 changes: 22 additions & 1 deletion pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ import (
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"

"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

var log = logf.RuntimeLog.WithName("infmap")

// clientListWatcherFunc knows how to create a ListWatcher
type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error)

Expand Down Expand Up @@ -225,8 +228,26 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
//
// Run the informer with stop options, ensuring that we shut down the informer
// if all its event handelrs are removed
if ip.started {
go i.Informer.Run(ip.stop)
log.Info("add informer", "gvk", gvk)
// Informer.RunWithStopOptions requires a context,
// so we need to derive this context from the ip.stop channel
ctx, cancel := context.WithCancel(context.TODO())
go func() {
<-ip.stop
cancel()
}()
go func() {
log.Info("starting informer with stop options")
i.Informer.RunWithStopOptions(ctx, cache.StopOptions{
StopOnZeroEventHandlers: true,
})
log.Info("stopping informer, removing from cache")
delete(ip.informersByGVK, gvk)
}()
}
return i, ip.started, nil
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolsca
}
}

// RemoveEventHandler removes the handler from each namespaced informer
func (i *multiNamespaceInformer) RemoveEventHandler(handler toolscache.ResourceEventHandler) error {
for _, informer := range i.namespaceToInformer {
if err := informer.RemoveEventHandler(handler); err != nil {
return err
}
}
return nil
}

// AddIndexers adds the indexer for each namespaced informer
func (i *multiNamespaceInformer) AddIndexers(indexers toolscache.Indexers) error {
for _, informer := range i.namespaceToInformer {
Expand All @@ -220,3 +230,14 @@ func (i *multiNamespaceInformer) HasSynced() bool {
}
return true
}

// IsStopped checks if any namespaced informer has stopped.
// TODO: is this the right behavior?
func (i *multiNamespaceInformer) IsStopped() bool {
for _, informer := range i.namespaceToInformer {
if ok := informer.IsStopped(); ok {
return ok
}
}
return false
}
68 changes: 56 additions & 12 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/meta"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -36,6 +37,7 @@ import (
)

var _ inject.Injector = &Controller{}
var defaultResyncPeriod = 5 * time.Second

// Controller implements controller.Controller
type Controller struct {
Expand Down Expand Up @@ -128,6 +130,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
}

c.Log.Info("Starting EventSource", "source", src)
c.Log.Info("watch test log", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

Expand All @@ -150,9 +153,11 @@ func (c *Controller) Start(ctx context.Context) error {
<-ctx.Done()
c.Queue.ShutDown()
}()
c.mu.Unlock()

wg := &sync.WaitGroup{}
err := func() error {
startWatches := func(ctx context.Context, cancel context.CancelFunc) error {
c.mu.Lock()
defer c.mu.Unlock()

// TODO(pwittrock): Reconsider HandleCrash
Expand All @@ -163,8 +168,20 @@ func (c *Controller) Start(ctx context.Context) error {
// caches.
for _, watch := range c.startWatches {
c.Log.Info("Starting EventSource", "source", watch.src)
kind, ok := watch.src.(*source.Kind)
if !ok {
continue
}
// inject the cancel func into the kind so that it knows how
// to shut down if the informer is stopped
// TODO: shoudl we pass this stuff in the context instead?
kind.InformerSyncInfo = &source.InformerSyncInfo{
Cancel: cancel,
ResyncPeriod: defaultResyncPeriod,
}

if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
if err := kind.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
c.Log.Error(err, "error starting src")
return err
}
}
Expand Down Expand Up @@ -197,12 +214,6 @@ func (c *Controller) Start(ctx context.Context) error {
}
}

// All the watches have been started, we can reset the local slice.
//
// We should never hold watches more than necessary, each watch source can hold a backing cache,
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil

// Launch workers to process resources
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
wg.Add(c.MaxConcurrentReconciles)
Expand All @@ -218,15 +229,48 @@ func (c *Controller) Start(ctx context.Context) error {

c.Started = true
return nil
}()
if err != nil {
return err
}
crdInstallLoop:
for {
// TODO: check rest mapper for existence of CRD instead of blindly starting the watches
watchCtx, cancel := context.WithCancel(ctx)
err := startWatches(watchCtx, cancel)
if err != nil {
// if no kind match error, wait for resyncPeriod
// and then retry
kindMatchErr := &meta.NoKindMatchError{}
if errors.As(err, &kindMatchErr) {
select {
case <-ctx.Done():
break crdInstallLoop
case <-time.NewTimer(defaultResyncPeriod).C:
continue
}
} else {
c.Log.Error(err, "startWatches errored out unexpectedly")
return err
}
}
// startWatches was successfully, now just
// wait for either a shutdown signal or
// indication that the informer shut itself down
select {
case <-ctx.Done():
break crdInstallLoop
case <-watchCtx.Done():
c.Started = false
continue
}
}

<-ctx.Done()
c.Log.Info("Shutdown signal received, waiting for all workers to finish")
wg.Wait()
c.Log.Info("All workers finished")
// All the watches have been started, we can reset the local slice.
//
// We should never hold watches more than necessary, each watch source can hold a backing cache,
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/source/internal/eventsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type EventHandler struct {
EventHandler handler.EventHandler
Queue workqueue.RateLimitingInterface
Predicates []predicate.Predicate
ErrorFunc func()
}

// OnAdd creates CreateEvent and calls Create on EventHandler
Expand Down Expand Up @@ -136,3 +137,10 @@ func (e EventHandler) OnDelete(obj interface{}) {
// Invoke delete handler
e.EventHandler.Delete(d, e.Queue)
}

func (e EventHandler) OnError(err error) {
if e.ErrorFunc != nil {
e.ErrorFunc()
}

}
41 changes: 40 additions & 1 deletion pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -96,6 +97,17 @@ type Kind struct {
// contain an error, startup and syncing finished.
started chan error
startCancel func()

InformerSyncInfo *InformerSyncInfo
}

// InformerSyncInfo is set by the caller of kind.Start to pass info
// on what to do when an informer shuts itself down, consisting of:
// 1. what cancel function to call
// 2. how long to wait between syncs
type InformerSyncInfo struct {
Cancel context.CancelFunc
ResyncPeriod time.Duration
}

var _ SyncingSource = &Kind{}
Expand Down Expand Up @@ -131,12 +143,39 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
ks.started <- err
return
}
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})

// Note: it is necessary to pass the event handler by reference rather than by value
// in order for it to be comparable (otherwise i.RemoveEventHandler will fail).
handler := &internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}
handler.ErrorFunc = func() {
// TODO: only remove if the error is a not found error
log.Info("removing event handler")
if err := i.RemoveEventHandler(handler); err != nil {
log.Error(err, "failed to remove event handler")
}
}
i.AddEventHandler(handler)

if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
}
close(ks.started)
if ks.InformerSyncInfo != nil {
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.NewTimer(ks.InformerSyncInfo.ResyncPeriod).C:
if i.IsStopped() {
ks.InformerSyncInfo.Cancel()
return
}
}
}
}()
}
}()

return nil
Expand Down