Skip to content

Commit

Permalink
Support shutdown watches dynamically
Browse files Browse the repository at this point in the history
Co-authored-by: FillZpp <FillZpp.pub@gmail.com>
Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com>
  • Loading branch information
inteon and FillZpp committed Dec 27, 2023
1 parent 1b80b96 commit eb6f3e2
Show file tree
Hide file tree
Showing 12 changed files with 1,157 additions and 266 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var _ = Describe("controller.Controller", func() {

ctx, cancel := context.WithCancel(context.Background())
watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan}
watch := &source.Channel{Broadcaster: source.NewChannelBroadcaster(watchChan)}
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
Expand Down
46 changes: 40 additions & 6 deletions pkg/controller/controllertest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package controllertest

import (
"fmt"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,7 +35,8 @@ type FakeInformer struct {
// RunCount is incremented each time RunInformersAndControllers is called
RunCount int

handlers []eventHandlerWrapper
mu sync.RWMutex
handlers []*eventHandlerWrapper
}

type modernResourceEventHandler interface {
Expand All @@ -51,7 +54,8 @@ type legacyResourceEventHandler interface {
// eventHandlerWrapper wraps a ResourceEventHandler in a manner that is compatible with client-go 1.27+ and older.
// The interface was changed in these versions.
type eventHandlerWrapper struct {
handler any
handler any
hasSynced bool
}

func (e eventHandlerWrapper) OnAdd(obj interface{}) {
Expand All @@ -78,6 +82,10 @@ func (e eventHandlerWrapper) OnDelete(obj interface{}) {
e.handler.(legacyResourceEventHandler).OnDelete(obj)
}

func (e eventHandlerWrapper) HasSynced() bool {
return e.hasSynced
}

// AddIndexers does nothing. TODO(community): Implement this.
func (f *FakeInformer) AddIndexers(indexers cache.Indexers) error {
return nil
Expand All @@ -98,10 +106,13 @@ func (f *FakeInformer) HasSynced() bool {
return f.Synced
}

// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration.
// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers.
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, eventHandlerWrapper{handler})
return nil, nil
f.mu.Lock()
defer f.mu.Unlock()
eh := &eventHandlerWrapper{handler, true}
f.handlers = append(f.handlers, eh)
return eh, nil
}

// Run implements the Informer interface. Increments f.RunCount.
Expand All @@ -111,20 +122,26 @@ func (f *FakeInformer) Run(<-chan struct{}) {

// Add fakes an Add event for obj.
func (f *FakeInformer) Add(obj metav1.Object) {
f.mu.RLock()
defer f.mu.RUnlock()
for _, h := range f.handlers {
h.OnAdd(obj)
}
}

// Update fakes an Update event for obj.
func (f *FakeInformer) Update(oldObj, newObj metav1.Object) {
f.mu.RLock()
defer f.mu.RUnlock()
for _, h := range f.handlers {
h.OnUpdate(oldObj, newObj)
}
}

// Delete fakes an Delete event for obj.
func (f *FakeInformer) Delete(obj metav1.Object) {
f.mu.RLock()
defer f.mu.RUnlock()
for _, h := range f.handlers {
h.OnDelete(obj)
}
Expand All @@ -135,8 +152,25 @@ func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEve
return nil, nil
}

// RemoveEventHandler does nothing. TODO(community): Implement this.
// RemoveEventHandler removes an EventHandler to the fake Informers.
func (f *FakeInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error {
eh, ok := handle.(*eventHandlerWrapper)
if !ok {
return fmt.Errorf("invalid registration type %t", handle)
}

f.mu.Lock()
defer f.mu.Unlock()

handlers := make([]*eventHandlerWrapper, 0, len(f.handlers))
for _, h := range f.handlers {
if h == eh {
continue
}
handlers = append(handlers, h)
}
f.handlers = handlers

return nil
}

Expand Down
50 changes: 42 additions & 8 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,18 @@ type Controller struct {
// the Queue for processing
Queue workqueue.RateLimitingInterface

// startedSources maintains a list of sources that have already started.
startedSources []source.Source

// mu is used to synchronize Controller setup
mu sync.Mutex

// Started is true if the Controller has been Started
Started bool

// Stopped is true if the Controller has been Stopped
Stopped bool

// ctx is the context that was passed to Start() and used when starting watches.
//
// According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
Expand Down Expand Up @@ -124,6 +130,10 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
c.mu.Lock()
defer c.mu.Unlock()

if c.Stopped {
return fmt.Errorf("can not start watch in a stopped controller")
}

// Controller hasn't started yet, store the watches locally and return.
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
Expand All @@ -133,7 +143,11 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
}

c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
err := src.Start(c.ctx, evthdler, c.Queue, prct...)
if err == nil {
c.startedSources = append(c.startedSources, src)
}
return err
}

// NeedLeaderElection implements the manager.LeaderElectionRunnable interface.
Expand All @@ -149,22 +163,43 @@ func (c *Controller) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling
// but lock outside to get proper handling of the queue shutdown
c.mu.Lock()
if c.Stopped {
return fmt.Errorf("can not restart a stopped controller, you should create a new one")
}
if c.Started {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
}

c.initMetrics()

// Set the internal context.
c.ctx = ctx
var cancel context.CancelFunc
c.ctx, cancel = context.WithCancel(ctx)

wg := &sync.WaitGroup{}

c.Queue = c.MakeQueue()
go func() {
<-ctx.Done()
c.Queue.ShutDown()
defer func() {
var startedSources []source.Source
c.mu.Lock()
c.Stopped = true
startedSources = c.startedSources
c.mu.Unlock()

c.Queue.ShutDown() // Stop receiving new items in the queue

cancel() // cancel the context to stop all the sources
for _, src := range startedSources {
if err := src.Shutdown(); err != nil {
c.LogConstructor(nil).Error(err, "Failed to stop watch source when controller stopping", "source", src)
}
}
c.LogConstructor(nil).Info("All watch sources finished")

wg.Wait() // Wait for workers to finish
c.LogConstructor(nil).Info("All workers finished")
}()

wg := &sync.WaitGroup{}
err := func() error {
defer c.mu.Unlock()

Expand All @@ -180,6 +215,7 @@ func (c *Controller) Start(ctx context.Context) error {
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
c.startedSources = append(c.startedSources, watch.src)
}

// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
Expand Down Expand Up @@ -238,8 +274,6 @@ func (c *Controller) Start(ctx context.Context) error {

<-ctx.Done()
c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
wg.Wait()
c.LogConstructor(nil).Info("All workers finished")
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ var _ = Describe("controller", func() {

err = ctrl.Start(context.TODO())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced"))
Expect(err.Error()).To(ContainSubstring("timed out trying to get an informer from cache and waiting for cache to be synced for Kind *v1.Deployment"))
})

It("should not error when context cancelled", func() {
Expand Down Expand Up @@ -226,7 +226,7 @@ var _ = Describe("controller", func() {
Object: p,
}

ins := &source.Channel{Source: ch}
ins := &source.Channel{Broadcaster: source.NewChannelBroadcaster(ch)}
ins.DestBufferSize = 1

// send the event to the channel
Expand Down Expand Up @@ -260,7 +260,7 @@ var _ = Describe("controller", func() {

e := ctrl.Start(ctx)
Expect(e).To(HaveOccurred())
Expect(e.Error()).To(ContainSubstring("must specify Channel.Source"))
Expect(e.Error()).To(ContainSubstring("must create Channel with a non-nil Broadcaster"))
})

It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
Expand Down Expand Up @@ -308,7 +308,7 @@ var _ = Describe("controller", func() {
Expect(ctrl.Start(ctx)).To(Succeed())
err := ctrl.Start(ctx)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times"))
Expect(err.Error()).To(Equal("can not restart a stopped controller, you should create a new one"))
})

})
Expand Down
Loading

0 comments on commit eb6f3e2

Please sign in to comment.