Skip to content

Conditional Controller with SporadicRunnable #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type ForInput struct {
predicates []predicate.Predicate
objectProjection objectProjection
err error
sporadic bool
}

// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
Expand All @@ -97,6 +98,7 @@ type OwnsInput struct {
object client.Object
predicates []predicate.Predicate
objectProjection objectProjection
sporadic bool
}

// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
Expand All @@ -118,6 +120,7 @@ type WatchesInput struct {
eventhandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
sporadic bool
}

// Watches exposes the lower-level ControllerManagedBy Watches functions through the builder. Consider using
Expand Down Expand Up @@ -222,7 +225,23 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
var src source.Source
if blder.forInput.sporadic {
gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
if err != nil {
return err
}
existsInDiscovery := func() bool {
if _, err := blder.mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
return false
}
return true
}

src = &source.SporadicKind{Kind: source.Kind{Type: typeForSrc}, DiscoveryCheck: existsInDiscovery}
} else {
src = &source.Kind{Type: typeForSrc}
}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
Expand Down
35 changes: 34 additions & 1 deletion pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ type Controller struct {
CacheSyncTimeout time.Duration

// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription
startWatches []watchDescription
sporadicWatches []sporadicWatchDescription

// Log is used to log messages to users during reconciliation, or for example when a watch is started.
Log logr.Logger
Expand All @@ -94,6 +95,12 @@ type watchDescription struct {
predicates []predicate.Predicate
}

type sporadicWatchDescription struct {
src source.SporadicSource
handler handler.EventHandler
predicates []predicate.Predicate
}

// Reconcile implements reconcile.Reconciler
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
Expand All @@ -119,6 +126,11 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
}
}

if sporadicSource, ok := src.(source.SporadicSource); ok && !c.Started {
c.sporadicWatches = append(c.sporadicWatches, sporadicWatchDescription{src: sporadicSource, handler: evthdler, predicates: prct})
return nil
}

// Controller hasn't started yet, store the watches locally and return.
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
Expand All @@ -131,6 +143,27 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

// Rea
func (c *Controller) ReadyToStart(ctx context.Context) <-chan struct{} {
ready := make(chan struct{})
if len(c.sporadicWatches) == 0 {
close(ready)
return ready
}

var wg sync.WaitGroup
for _, w := range c.sporadicWatches {
wg.Add(1)
go w.src.Ready(ctx, &wg)
}

go func() {
wg.Wait()
close(ready)
}()
return ready
}

// Start implements controller.Controller
func (c *Controller) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling
Expand Down
25 changes: 25 additions & 0 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type controllerManager struct {
// These Runnables will not be blocked by lead election.
nonLeaderElectionRunnables []Runnable

sporadicRunnables []SporadicRunnable

// recorderProvider is used to generate event recorders that will be injected into Controllers
// (and EventHandlers, Sources and Predicates).
recorderProvider *intrec.Provider
Expand Down Expand Up @@ -207,6 +209,10 @@ func (cm *controllerManager) Add(r Runnable) error {
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else if hasCache, ok := r.(hasCache); ok {
cm.caches = append(cm.caches, hasCache)

} else if sporadicRunnable, ok := r.(SporadicRunnable); ok {
cm.sporadicRunnables = append(cm.sporadicRunnables, sporadicRunnable)

} else {
shouldStart = cm.startedLeader
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
Expand Down Expand Up @@ -583,6 +589,25 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF
return nil
}

func (cm *controllerManager) startSporadicRunnables() {
cm.mu.Lock()
cm.waitForCache(cm.internalCtx)
cm.mu.Unlock()

for _, sr := range cm.sporadicRunnables {
go func(sr SporadicRunnable) {
for {
select {
case <-cm.internalCtx.Done():
return
case <-sr.Ready(cm.internalCtx):
cm.startRunnable(sr)
}
}
}(sr)
}
}

func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()
Expand Down
6 changes: 6 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ func (r RunnableFunc) Start(ctx context.Context) error {
return r(ctx)
}

type SporadicRunnable interface {
Runnable
Ready(ctx context.Context) ReadySignal
}
type ReadySignal <-chan struct{}

// LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode.
type LeaderElectionRunnable interface {
// NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode.
Expand Down
34 changes: 34 additions & 0 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -57,6 +58,11 @@ type Source interface {
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

type SporadicSource interface {
Source
Ready(ctx context.Context, wg *sync.WaitGroup) <-chan struct{}
}

// SyncingSource is a source that needs syncing prior to being usable. The controller
// will call its WaitForSync prior to starting workers.
type SyncingSource interface {
Expand Down Expand Up @@ -98,6 +104,34 @@ type Kind struct {
startCancel func()
}

type SporadicKind struct {
Kind
DiscoveryCheck func() bool
}

func (sk *SporadicKind) Ready(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} {
defer wg.Done()
ready := make(chan struct{})
go func() {
for {
select {
case <-ctx.Done():
close(ready)
return
default:
if sk.DiscoveryCheck() {
close(ready)
return
}
//TODO: parameterize this
time.Sleep(5 * time.Second)
}
}
}()

return ready
}

var _ SyncingSource = &Kind{}

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
Expand Down