Skip to content

WIP: Initial Ref counting #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ type Informer interface {
AddIndexers(indexers toolscache.Indexers) error
//HasSynced return true if the informers underlying store has synced
HasSynced() bool

// ModifyEventHandlerCount updates and retrieves the number of references for an informer.
// Passing a positive delta increments the EventHandler referece count a negative delta
// decrements it (by delta) and a delta of 0 is just a getter.

// This is just a hacky way to test out ref counting without having to pipe a bunch of
// ref counting functions down the stack.

// Alternatly, we could (and probably should) have separate inc, dec, and get functions
ModifyEventHandlerCount(delta int) int
}

// Options are the optional arguments for creating a new InformersMap object
Expand Down
22 changes: 19 additions & 3 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,26 @@ func newSpecificInformersMap(config *rest.Config,
return ip
}

//TODO: comment
// we can get rid of this if apimachinery adds the ability to retrieve this from the SharedIndexInformer
// but until then, we have to track it ourselves
type HandlerCountingInformer struct {
// Informer is the cached informer
cache.SharedIndexInformer

// count indicates the number of EventHandlers registered on the informer
count int
}

func (i *HandlerCountingInformer) ModifyEventHandlerCount(delta int) int {
i.count += delta
return i.count
}

// MapEntry contains the cached data for an Informer
type MapEntry struct {
// Informer is the cached informer
Informer cache.SharedIndexInformer
//Informer is the HandlerCountingInformer
Informer *HandlerCountingInformer

// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader
Expand Down Expand Up @@ -212,7 +228,7 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
i := &MapEntry{
Informer: ni,
Informer: &HandlerCountingInformer{ni, 0},
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
}
ip.informersByGVK[gvk] = i
Expand Down
9 changes: 9 additions & 0 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ type multiNamespaceInformer struct {

var _ Informer = &multiNamespaceInformer{}

// ModifyEventHandlerCount TODO: comment
func (i *multiNamespaceInformer) ModifyEventHandlerCount(delta int) int {
total := 0
for _, informer := range i.namespaceToInformer {
total += informer.ModifyEventHandlerCount(delta)
}
return total
}

// AddEventHandler adds the handler to each namespaced informer
func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) {
for _, informer := range i.namespaceToInformer {
Expand Down
13 changes: 10 additions & 3 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,16 @@ func (c *Controller) Start(ctx context.Context) error {
// caches to sync so that they have a chance to register their intendeded
// caches.
for _, watch := range c.startWatches {
c.Log.Info("Starting EventSource", "source", watch.src)
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
stoppableSource, ok := watch.src.(source.StoppableSource)
if ok {
// TODO: apply daniel's suggested pattern
go stoppableSource.StartStoppable(ctx, watch.handler, c.Queue, watch.predicates...)
c.Log.Info("Starting STOPPABLE EventSource", "source", watch.src)
} else {
c.Log.Info("Starting EventSource", "source", watch.src)
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
}

Expand Down
28 changes: 27 additions & 1 deletion pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type SyncingSource interface {
WaitForSync(ctx context.Context) error
}

type StoppableSource interface {
Source
StartStoppable(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used
// and not overwritten. It can be used to watch objects in a different cluster by passing the cache
// from that other cluster
Expand Down Expand Up @@ -120,6 +125,27 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return err
}
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
//if handlerCountingInformer, ok := i.(*cache.HandlerCountingInformer); ok {
//handlerCountingInformer.Count += 1
newCount := i.ModifyEventHandlerCount(1)
fmt.Printf("increment, newCount is%+v\n", newCount)
//}
return nil
}

func (ks *Kind) StartStoppable(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
i, err := ks.cache.GetInformer(ctx, ks.Type)
if err != nil {
return err
}
if err := ks.Start(ctx, handler, queue, prct...); err != nil {
return err
}
<-ctx.Done()
newCount := i.ModifyEventHandlerCount(-1)
fmt.Printf("decrement, newCount is%+v\n", newCount)
// TODO: do we need to actually stop something with the context?
return nil
}

Expand All @@ -143,7 +169,7 @@ func (ks *Kind) WaitForSync(ctx context.Context) error {
var _ inject.Cache = &Kind{}

// InjectCache is internal should be called only by the Controller. InjectCache is used to inject
// the Cache dependency initialized by the ControllerManager.
// the cache dependency initialized by the ControllerManager.
func (ks *Kind) InjectCache(c cache.Cache) error {
if ks.cache == nil {
ks.cache = c
Expand Down