diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 9f42782d17..a78eb6f5f9 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -131,11 +131,24 @@ import ( // A delete notification exposes the last locally known non-absent // state, except that its ResourceVersion is replaced with a // ResourceVersion in which the object is actually absent. +// +// The informational methods (EventHandlerCount, IsStopped, IsStarted) +// are intended to be used to manage informers in any upper informer +// management layer for creating and destroying informers on-the fly when +// adding or removing handlers. +// Beware of race conditions: Such a layer must hide the basic informer +// objects from its users and offer a closed *synchronized* view for informers. +// Although these informational methods are synchronized each, in +// sequences the state queried first might have been changed before +// calling the next method, if other callers (or the stop channel) +// are able to interact with the informer interface in parallel. type SharedInformer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination // between different handlers. - AddEventHandler(handler ResourceEventHandler) + // It returns a handle for the handler that can be used to remove + // the handler again. + AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerHandle, error) // AddEventHandlerWithResyncPeriod adds an event handler to the // shared informer with the requested resync period; zero means // this handler does not care about resyncs. The resync operation @@ -150,7 +163,20 @@ type SharedInformer interface { // between any two resyncs may be longer than the nominal period // because the implementation takes time to do work and there may // be competing load and scheduling noise. - AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) + // It returns a handle for the handler that can be used to remove + // the handler again and an error if the handler cannot be added. + AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerHandle, error) + // RemoveEventHandlerByHandle removes a formerly added event handler given by + // its registration handle. + // If, for some reason, the same handler has been added multiple + // times, only the registration for the given registration handle + // will be removed. + // It returns an error if the handler cannot be removed, + // because the informer is already stopped. + // Calling Remove on an already removed handle returns no error + // because the handler is finally (still) removed after calling this + // method. + RemoveEventHandlerByHandle(handle *ResourceEventHandlerHandle) error // GetStore returns the informer's local cache as a Store. GetStore() Store // GetController is deprecated, it does nothing useful @@ -195,6 +221,35 @@ type SharedInformer interface { // transform before mutating it at all and returning the copy to prevent // data races. SetTransform(handler TransformFunc) error + + // EventHandlerCount return the number of actually registered + // event handlers. + EventHandlerCount() int + + // IsStopped reports whether the informer has already been stopped. + // Adding event handlers to already stopped informers is not possible. + IsStopped() bool + + // IsStarted reports whether the informer has already been started + IsStarted() bool +} + +// ResourceEventHandlerHandle is a handle returned by the +// registration methods of SharedInformers for a registered +// ResourceEventHandler. It can be used later on to remove +// a registration again. +// This indirection is required, because the ResourceEventHandler +// interface can be implemented by non-go-comparable handlers, which +// could not be removed from a list anymore. +type ResourceEventHandlerHandle struct { + listener *processorListener +} + +// IsActive reports whether this registration is still active +// meaning that the handler registered with this handle is +// still registered. +func (h *ResourceEventHandlerHandle) IsActive() bool { + return h.listener != nil } // SharedIndexInformer provides add and get Indexers ability based on SharedInformer. @@ -492,8 +547,8 @@ func (s *sharedIndexInformer) GetController() Controller { return &dummyController{informer: s} } -func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { - s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) +func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerHandle, error) { + return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) } func determineResyncPeriod(desired, check time.Duration) time.Duration { @@ -513,13 +568,13 @@ func determineResyncPeriod(desired, check time.Duration) time.Duration { const minimumResyncPeriod = 1 * time.Second -func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { +func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerHandle, error) { s.startedLock.Lock() defer s.startedLock.Unlock() if s.stopped { - klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler) - return + klog.V(2).Infof("Handler %v is not added to shared informer because it has stopped already", handler) + return nil, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler) } if resyncPeriod > 0 { @@ -543,10 +598,11 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv } listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) + handle := &ResourceEventHandlerHandle{listener} if !s.started { s.processor.addListener(listener) - return + return handle, nil } // in order to safely join, we have to @@ -561,6 +617,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } + return handle, nil } func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { @@ -610,6 +667,55 @@ func (s *sharedIndexInformer) OnDelete(old interface{}) { s.processor.distribute(deleteNotification{oldObj: old}, false) } +// IsStarted reports whether the informer has already been started +func (s *sharedIndexInformer) IsStarted() bool { + s.startedLock.Lock() + defer s.startedLock.Unlock() + return s.started +} + +// IsStopped reports whether the informer has already been stopped +func (s *sharedIndexInformer) IsStopped() bool { + s.startedLock.Lock() + defer s.startedLock.Unlock() + return s.stopped +} + +// EventHandlerCount reports whether the informer still has registered +// event handlers +func (s *sharedIndexInformer) EventHandlerCount() int { + s.startedLock.Lock() + defer s.startedLock.Unlock() + return len(s.processor.listeners) +} + +// RemoveEventHandlerByHandle tries to remove a formerly added event handler by its +// handle returned for its registration. +// If a handler has been added multiple times, only the registration for the +// given handle will be removed. +func (s *sharedIndexInformer) RemoveEventHandlerByHandle(handle *ResourceEventHandlerHandle) error { + if handle.listener == nil { + return nil + } + + s.startedLock.Lock() + defer s.startedLock.Unlock() + + if s.stopped { + return fmt.Errorf("handler %v is not removed from shared informer because it has stopped already", handle.listener.handler) + } + + // in order to safely remove, we have to + // 1. stop sending add/update/delete notifications + // 2. remove and stop listener + // 3. unblock + s.blockDeltas.Lock() + defer s.blockDeltas.Unlock() + s.processor.removeListener(handle.listener) + handle.listener = nil + return nil +} + // sharedProcessor has a collection of processorListener and can // distribute a notification object to its listeners. There are two // kinds of distribute operations. The sync distributions go to a @@ -641,6 +747,33 @@ func (p *sharedProcessor) addListenerLocked(listener *processorListener) { p.syncingListeners = append(p.syncingListeners, listener) } +func (p *sharedProcessor) removeListener(listener *processorListener) { + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + + p.removeListenerLocked(listener) + if p.listenersStarted { + close(listener.addCh) + } +} + +func (p *sharedProcessor) removeListenerLocked(listener *processorListener) { + for i := 0; i < len(p.listeners); i++ { + l := p.listeners[i] + if l == listener { + p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) + i-- + } + } + for i := 0; i < len(p.syncingListeners); i++ { + l := p.syncingListeners[i] + if l == listener { + p.syncingListeners = append(p.syncingListeners[:i], p.syncingListeners[i+1:]...) + i-- + } + } +} + func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 3f2ac71bfe..21f197e0a8 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -390,3 +390,288 @@ func TestSharedInformerTransformer(t *testing.T) { t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames) } } + +func TestSharedInformerRemoveHandler(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + + handler1 := &ResourceEventHandlerFuncs{} + handle1, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler1: %s", err) + return + } + handler2 := &ResourceEventHandlerFuncs{} + handle2, err := informer.AddEventHandler(handler2) + if err != nil { + t.Errorf("informer did not add handler2: %s", err) + return + } + + if informer.EventHandlerCount() != 2 { + t.Errorf("informer has %d registered handler, instead of 2", informer.EventHandlerCount()) + } + + if err := informer.RemoveEventHandlerByHandle(handle2); err != nil { + t.Errorf("removing of first pointer handler failed: %s", err) + } + if informer.EventHandlerCount() != 1 { + t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", informer.EventHandlerCount()) + } + + if err := informer.RemoveEventHandlerByHandle(handle1); err != nil { + t.Errorf("removing of second pointer handler failed: %s", err) + } + if informer.EventHandlerCount() != 0 { + t.Errorf("informer still has registered handlers after removing both handlers") + } +} + +func TestSharedInformerRemoveNonComparableHandler(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + + handler1 := ResourceEventHandlerFuncs{} + handle1, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler1: %s", err) + return + } + handler2 := &ResourceEventHandlerFuncs{} + handle2, err := informer.AddEventHandler(handler2) + if err != nil { + t.Errorf("informer did not add handler2: %s", err) + return + } + + if informer.EventHandlerCount() != 2 { + t.Errorf("informer has %d registered handler(s), instead of 2", informer.EventHandlerCount()) + } + + if err := informer.RemoveEventHandlerByHandle(handle2); err != nil { + t.Errorf("removing of pointer handler failed: %s", err) + } + if informer.EventHandlerCount() != 1 { + t.Errorf("after removal informer has %d registered handler(s), instead of 1", informer.EventHandlerCount()) + } + + if err := informer.RemoveEventHandlerByHandle(handle1); err != nil { + t.Errorf("removing of non-pointer handler failed: %s", err) + } + if informer.EventHandlerCount() != 0 { + t.Errorf("after removal informer has %d registered handler(s), instead of 0", informer.EventHandlerCount()) + } +} + +func TestSharedInformerMultipleRegistration(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + + handler1 := &ResourceEventHandlerFuncs{} + reg1, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler for the first time: %s", err) + return + } + + if !reg1.IsActive() { + t.Errorf("handle1 is not active after successful registration") + return + } + + reg2, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler for the second: %s", err) + return + } + + if !reg2.IsActive() { + t.Errorf("handle2 is not active after successful registration") + return + } + + if informer.EventHandlerCount() != 2 { + t.Errorf("informer has %d registered handler(s), instead of 1", informer.EventHandlerCount()) + } + + if err := informer.RemoveEventHandlerByHandle(reg1); err != nil { + t.Errorf("removing of duplicate handler registration failed: %s", err) + } + + if reg1.IsActive() { + t.Errorf("handle1 is still active after successful remove") + return + } + if !reg2.IsActive() { + t.Errorf("handle2 is not active after removing handle1") + return + } + + if informer.EventHandlerCount() != 1 { + if informer.EventHandlerCount() == 0 { + t.Errorf("informer has no registered handler anymore after removal of duplicate registrations") + } else { + t.Errorf("informer has unexpected number (%d) of handlers after removal of duplicate handler registration", informer.EventHandlerCount()) + } + } + + if err := informer.RemoveEventHandlerByHandle(reg2); err != nil { + t.Errorf("removing of second handler registration failed: %s", err) + } + + if reg2.IsActive() { + t.Errorf("handle2 is still active after successful remove") + return + } + + if informer.EventHandlerCount() != 0 { + t.Errorf("informer has unexpected number (%d) of handlers after removal of second handler registrations", informer.EventHandlerCount()) + } +} + +func TestRemovingRemovedSharedInformer(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + handler := &ResourceEventHandlerFuncs{} + reg, err := informer.AddEventHandler(handler) + + if err != nil { + t.Errorf("informer did not add handler for the first time: %s", err) + return + } + if err := informer.RemoveEventHandlerByHandle(reg); err != nil { + t.Errorf("removing of handler registration failed: %s", err) + return + } + if reg.IsActive() { + t.Errorf("handle is still active after successful remove") + return + } + if err := informer.RemoveEventHandlerByHandle(reg); err != nil { + t.Errorf("removing of already removed registration yields unexpected error: %s", err) + } + if reg.IsActive() { + t.Errorf("handle is still active after second remove") + return + } +} + +func TestStateSharedInformer(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + listener := newTestListener("listener", 0, "pod1") + informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + + if informer.IsStarted() { + t.Errorf("informer already started after creation") + return + } + if informer.IsStopped() { + t.Errorf("informer already stopped after creation") + return + } + stop := make(chan struct{}) + go informer.Run(stop) + if !listener.ok() { + t.Errorf("informer did not report initial objects") + close(stop) + return + } + + if !informer.IsStarted() { + t.Errorf("informer does not report to be started although handling events") + close(stop) + return + } + if informer.IsStopped() { + t.Errorf("informer reports to be stopped although stop channel not closed") + close(stop) + return + } + + close(stop) + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + + if !informer.IsStopped() { + t.Errorf("informer reports not to be stopped although stop channel closed") + return + } + if !informer.IsStarted() { + t.Errorf("informer reports not to be started after it has been started and stopped") + return + } +} + +func TestAddOnStoppedSharedInformer(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + listener := newTestListener("listener", 0, "pod1") + stop := make(chan struct{}) + go informer.Run(stop) + close(stop) + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + if !informer.IsStopped() { + t.Errorf("informer reports not to be stopped although stop channel closed") + return + } + + handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + if err == nil { + t.Errorf("stopped informer did not reject add handler") + return + } + if !strings.HasSuffix(err.Error(), "is not added to shared informer because it has stopped already") { + t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err) + return + } + if handle != nil { + t.Errorf("got handle for added handler on stopped informer") + return + } +} + +func TestRemoveOnStoppedSharedInformer(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + listener := newTestListener("listener", 0, "pod1") + handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + if err != nil { + t.Errorf("informer did not add handler: %s", err) + return + } + stop := make(chan struct{}) + go informer.Run(stop) + close(stop) + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + + if !informer.IsStopped() { + t.Errorf("informer reports not to be stopped although stop channel closed") + return + } + err = informer.RemoveEventHandlerByHandle(handle) + if err == nil { + t.Errorf("informer removes handler on stopped informer") + return + } + if !strings.HasSuffix(err.Error(), "is not removed from shared informer because it has stopped already") { + t.Errorf("removing handler from stopped informer yield unexpected error: %s", err) + return + } +}