Skip to content

Informer error handler #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ type Cache interface {
Informers
}

// InformerOptions gives the caller some options for greater control over the lifecycle of the informer
type InformerOptions struct {
// StopperCh in a channel that when closed, instructs the informer to stop running.
StopperCh chan struct{}
// ErrorHandler passed to the informer's SetWatchErrorHandler and handles any errors
// from ListAndWatch calls made by the informer's underlying reflector.
ErrorHandler func(r *toolscache.Reflector, err error)
}

// InformerInfo provides information when retrieving an informer.
type InformerInfo struct {
// Informer is the Informer retrieved.
Informer Informer
// StopCh is a channel that is closed when the informer is stopped.
StopCh <-chan struct{}
}

// Informers knows how to create or fetch informers for different
// group-version-kinds, and add indices to those informers. It's safe to call
// GetInformer from multiple threads.
Expand All @@ -54,6 +71,12 @@ type Informers interface {
// API kind and resource.
GetInformer(ctx context.Context, obj client.Object) (Informer, error)

// GetInformerWithOptions retrieves an existing informer for the given object along with it's stop channel
// that fires when the informer has stopped.
//
// If the informer does not already exist, it constructs an informer with the supplied InformerOptions.
GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error)

// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
// of the underlying object.
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
Expand Down
27 changes: 23 additions & 4 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
return err
}

started, cache, err := ip.InformersMap.Get(ctx, gvk, out)
started, cache, err := ip.InformersMap.Get(ctx, gvk, out, nil, nil)
if err != nil {
return err
}
Expand All @@ -76,7 +76,7 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts .
return err
}

started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj)
started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou
return nil, err
}

_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
_, i, err := ip.InformersMap.Get(ctx, gvk, obj, nil, nil)
if err != nil {
return nil, err
}
Expand All @@ -152,13 +152,32 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In
return nil, err
}

_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
_, i, err := ip.InformersMap.Get(ctx, gvk, obj, nil, nil)
if err != nil {
return nil, err
}
return i.Informer, err
}

// GetInformerWithOptions retrieves the informer and its stop channel, creating and starting a
// new informer with the supplied options if necessary.
func (ip *informerCache) GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error) {
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
if err != nil {
return nil, err
}

_, i, err := ip.InformersMap.Get(ctx, gvk, obj, options.StopperCh, options.ErrorHandler)
if err != nil {
return nil, err
}
return &InformerInfo{
i.Informer,
i.StopCh,
}, nil

}

// NeedLeaderElection implements the LeaderElectionRunnable interface
// to indicate that this can be started without requiring the leader lock
func (ip *informerCache) NeedLeaderElection() bool {
Expand Down
13 changes: 13 additions & 0 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac
return c.informerFor(gvk, obj)
}

// GetInformerWithOptions implements Informers
func (c *FakeInformers) GetInformerWithOptions(ctx context.Context, obj client.Object, options *cache.InformerOptions) (*cache.InformerInfo, error) {
i, err := c.GetInformer(ctx, obj)
if err != nil {
return nil, err
}
// fake informer is never started and therefore never stopped
// so stop channel is nil
return &cache.InformerInfo{
Informer: i,
}, nil
}

// WaitForCacheSync implements Informers
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
if c.Synced == nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,18 @@ func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool {

// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
// the Informer from the map.
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, errorHandler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) {
switch obj.(type) {
case *unstructured.Unstructured:
return m.unstructured.Get(ctx, gvk, obj)
return m.unstructured.Get(ctx, gvk, obj, stopperCh, errorHandler)
case *unstructured.UnstructuredList:
return m.unstructured.Get(ctx, gvk, obj)
return m.unstructured.Get(ctx, gvk, obj, stopperCh, errorHandler)
case *metav1.PartialObjectMetadata:
return m.metadata.Get(ctx, gvk, obj)
return m.metadata.Get(ctx, gvk, obj, stopperCh, errorHandler)
case *metav1.PartialObjectMetadataList:
return m.metadata.Get(ctx, gvk, obj)
return m.metadata.Get(ctx, gvk, obj, stopperCh, errorHandler)
default:
return m.structured.Get(ctx, gvk, obj)
return m.structured.Get(ctx, gvk, obj, stopperCh, errorHandler)
}
}

Expand Down
30 changes: 25 additions & 5 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type MapEntry struct {

// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader

// StopCh is a channel that is closed after
// the informer stops
StopCh <-chan struct{}
}

// specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
Expand Down Expand Up @@ -169,9 +173,9 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
return syncedFuncs
}

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, errorHandler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) {
// Return the informer if it is found
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
Expand All @@ -182,7 +186,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion

if !ok {
var err error
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
if i, started, err = ip.addInformerToMap(gvk, obj, stopperCh, errorHandler); err != nil {
return started, nil, err
}
}
Expand All @@ -197,7 +201,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion
return started, i, nil
}

func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, errorHandler func(r *cache.Reflector, err error)) (*MapEntry, bool, error) {
ip.mu.Lock()
defer ip.mu.Unlock()

Expand Down Expand Up @@ -228,17 +232,33 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
default:
}

informerStop := make(chan struct{})
i := &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()},
StopCh: informerStop,
}
ip.informersByGVK[gvk] = i

go func() {
select {
case <-ip.stop:
close(informerStop)
case <-stopperCh:
close(informerStop)
}
}()

i.Informer.SetWatchErrorHandler(errorHandler)

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
if ip.started {
go i.Informer.Run(ip.stop)
go func() {
i.Informer.Run(informerStop)
delete(ip.informersByGVK, gvk)
}()
}
return i, ip.started, nil
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cache
import (
"context"
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -115,6 +116,35 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
}

// Methods for multiNamespaceCache to conform to the Informers interface
func (c *multiNamespaceCache) GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error) {
informers := map[string]Informer{}
multiStopCh := make(chan struct{})
var wg sync.WaitGroup
for ns, cache := range c.namespaceToCache {
info, err := cache.GetInformerWithOptions(ctx, obj, options)
if err != nil {
return nil, err
}
informers[ns] = info.Informer
wg.Add(1)
go func(stopCh <-chan struct{}) {
defer wg.Done()
<-stopCh

}(info.StopCh)
}

go func() {
defer close(multiStopCh)
wg.Done()
}()
return &InformerInfo{
Informer: &multiNamespaceInformer{namespaceToInformer: informers},
StopCh: multiStopCh,
}, nil
}

func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
informers := map[string]Informer{}

Expand Down