Skip to content

WIP Sporadic Controllers #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -72,6 +73,7 @@ type ForInput struct {
predicates []predicate.Predicate
objectProjection objectProjection
err error
sporadic bool
}

// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
Expand All @@ -97,6 +99,7 @@ type OwnsInput struct {
object client.Object
predicates []predicate.Predicate
objectProjection objectProjection
sporadic bool
}

// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
Expand All @@ -118,6 +121,7 @@ type WatchesInput struct {
eventhandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
sporadic bool
}

// Watches exposes the lower-level ControllerManagedBy Watches functions through the builder. Consider using
Expand Down Expand Up @@ -222,7 +226,37 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
var src source.Source
fmt.Printf("blder.forInput.sporadic = %+v\n", blder.forInput.sporadic)
if blder.forInput.sporadic {
gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
if err != nil {
return err
}
fmt.Printf("gvk = %+v\n", gvk)
dc, err := discovery.NewDiscoveryClientForConfig(blder.mgr.GetConfig())
if err != nil {
return err
}
existsInDiscovery := func() bool {
resources, err := dc.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
fmt.Printf("NOT in discovery gvk = %+v\n", gvk)
return false
}
for _, res := range resources.APIResources {
if res.Kind == gvk.Kind {
fmt.Printf("YES in discovery gvk = %+v\n", gvk)
return true
}
}
fmt.Printf("NOT in discovery kind = %+v\n", gvk)
return false
}
src = &source.SporadicKind{Kind: source.Kind{Type: typeForSrc}, DiscoveryCheck: existsInDiscovery}
} else {
src = &source.Kind{Type: typeForSrc}
}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
Expand All @@ -236,6 +270,7 @@ func (blder *Builder) doWatch() error {
return err
}
src := &source.Kind{Type: typeForSrc}
// TODO: handle sporadic watches for owns types too
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
Expand All @@ -252,6 +287,8 @@ func (blder *Builder) doWatch() error {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)

// TODO: handle sporadic watches for owns types too

// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
Expand Down
6 changes: 6 additions & 0 deletions pkg/builder/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func (p projectAs) ApplyToWatches(opts *WatchesInput) {
opts.objectProjection = objectProjection(p)
}

type Sporadic struct{}

func (s Sporadic) ApplyToFor(opts *ForInput) {
opts.sporadic = true
}

var (
// OnlyMetadata tells the controller to *only* cache metadata, and to watch
// the the API server in metadata-only form. This is useful when watching
Expand Down
2 changes: 2 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Informers interface {
// API kind and resource.
GetInformer(ctx context.Context, obj client.Object) (Informer, error)

GetStoppableInformer(ctx context.Context, obj client.Object) (Informer, <-chan struct{}, error)

// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
// of the underlying object.
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
Expand Down
20 changes: 16 additions & 4 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
return err
}

started, cache, err := ip.InformersMap.Get(ctx, gvk, out)
started, cache, err := ip.InformersMap.Get(ctx, gvk, out, false)
if err != nil {
return err
}
Expand All @@ -76,7 +76,7 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts .
return err
}

started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj)
started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou
return nil, err
}

_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
_, i, err := ip.InformersMap.Get(ctx, gvk, obj, false)
if err != nil {
return nil, err
}
Expand All @@ -152,13 +152,25 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In
return nil, err
}

_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
_, i, err := ip.InformersMap.Get(ctx, gvk, obj, false)
if err != nil {
return nil, err
}
return i.Informer, err
}

func (ip *informerCache) GetStoppableInformer(ctx context.Context, obj client.Object) (Informer, <-chan struct{}, error) {
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
if err != nil {
return nil, nil, err
}
_, i, err := ip.InformersMap.Get(ctx, gvk, obj, true)
if err != nil {
return nil, nil, err
}
return i.Informer, i.StopCh, err
}

// NeedLeaderElection implements the LeaderElectionRunnable interface
// to indicate that this can be started without requiring the leader lock
func (ip *informerCache) NeedLeaderElection() bool {
Expand Down
9 changes: 9 additions & 0 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac
return c.informerFor(gvk, obj)
}

// GetStoppableInformer implements informers
func (c *FakeInformers) GetStoppableInformer(ctx context.Context, obj client.Object) (cache.Informer, <-chan struct{}, error) {
// TODO:
//panic("not implemented")
i, e := c.GetInformer(ctx, obj)
return i, nil, e

}

// WaitForCacheSync implements Informers
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
if c.Synced == nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool {

// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
// the Informer from the map.
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) {
switch obj.(type) {
case *unstructured.Unstructured:
return m.unstructured.Get(ctx, gvk, obj)
return m.unstructured.Get(ctx, gvk, obj, stopOnError)
case *unstructured.UnstructuredList:
return m.unstructured.Get(ctx, gvk, obj)
return m.unstructured.Get(ctx, gvk, obj, stopOnError)
case *metav1.PartialObjectMetadata:
return m.metadata.Get(ctx, gvk, obj)
return m.metadata.Get(ctx, gvk, obj, stopOnError)
case *metav1.PartialObjectMetadataList:
return m.metadata.Get(ctx, gvk, obj)
return m.metadata.Get(ctx, gvk, obj, stopOnError)
default:
return m.structured.Get(ctx, gvk, obj)
return m.structured.Get(ctx, gvk, obj, stopOnError)
}
}

Expand Down
54 changes: 50 additions & 4 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type MapEntry struct {

// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader

// informerDone is a channel that is closed after
// the informer stops
StopCh <-chan struct{}
}

// specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
Expand Down Expand Up @@ -166,7 +170,8 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) {
fmt.Println("inf Get")
// Return the informer if it is found
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
Expand All @@ -177,7 +182,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion

if !ok {
var err error
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
if i, started, err = ip.addInformerToMap(gvk, obj, stopOnError); err != nil {
return started, nil, err
}
}
Expand All @@ -192,7 +197,8 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion
return started, i, nil
}

func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (*MapEntry, bool, error) {
fmt.Println("inf addInf")
ip.mu.Lock()
defer ip.mu.Unlock()

Expand All @@ -203,31 +209,71 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
return i, ip.started, nil
}

fmt.Println("inf createLW")
fmt.Printf("inf gvk = %+v\n", gvk)
// Create a NewSharedIndexInformer and add it to the map.
var lw *cache.ListWatch
lw, err := ip.createListWatcher(gvk, ip)
if err != nil {
fmt.Printf("inf createLW err = %+v\n", err)
return nil, false, err
}
ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
fmt.Printf("RESTMapping err = %+v\n", err)
fmt.Printf("gvk = %+v\n", gvk)
return nil, false, err
}
fmt.Println("inf RM success")
informerStop := make(chan struct{})
i := &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()},
StopCh: informerStop,
}
ip.informersByGVK[gvk] = i

go func() {
<-ip.stop
close(informerStop)
}()

i.Informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
ip.mu.RLock()
defer ip.mu.RUnlock()
fmt.Printf("inf handler err = %+v\n", err)
// TODO: check the error for not found
if stopOnError {
fmt.Println("inf stopping")
close(informerStop)
}

})

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?

// TODO: not cancelling? (leak)
//runCtx, cancel := context.WithCancel(ctx)
//go func() {
// <-ip.stop
// fmt.Println("inf ip stopped")
// cancel()
//}()
if ip.started {
go i.Informer.Run(ip.stop)
fmt.Println("inf Run")
//go i.Informer.Run(informerStop)
go func() {
i.Informer.Run(informerStop)
fmt.Println("informer done running, remove from map")
delete(ip.informersByGVK, gvk)
}()
}
fmt.Println("inf addInformer returning")
return i, ip.started, nil
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,21 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
}

// TODO
func (c *multiNamespaceCache) GetStoppableInformer(ctx context.Context, obj client.Object) (Informer, <-chan struct{}, error) {
panic("not implemented")
//informers := map[string]Informer{}
//for ns, cache := range c.namespaceToCache {

// informer, _, err := cache.GetStoppableInformer(ctx, obj)
// if err != nil {
// return nil, nil, err
// }
// informers[ns] = informer
//}
//return &multiNamespaceInformer{namespaceToInformer: informers}, nil, nil
}

func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
informers := map[string]Informer{}
for ns, cache := range c.namespaceToCache {
Expand Down
Loading