Skip to content

Pr/stop options self contained plus one client go commit #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,36 @@ module sigs.k8s.io/controller-runtime
go 1.15

require (
github.com/coreos/go-etcd v2.0.0+incompatible // indirect
github.com/cpuguy83/go-md2man v1.0.10 // indirect
github.com/docker/docker v0.7.3-0.20190327010347-be7ac8be2ae0 // indirect
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/fsnotify/fsnotify v1.4.9
github.com/go-logr/logr v0.2.1
github.com/go-logr/zapr v0.2.0
github.com/google/go-cmp v0.5.2 // indirect
github.com/googleapis/gnostic v0.5.1 // indirect
github.com/gophercloud/gophercloud v0.1.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.10 // indirect
github.com/onsi/ginkgo v1.14.1
github.com/onsi/gomega v1.10.2
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.10.0
github.com/robfig/cron v1.2.0
github.com/spf13/pflag v1.0.5
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8 // indirect
go.uber.org/goleak v1.1.10
go.uber.org/zap v1.15.0
golang.org/x/text v0.3.3 // indirect
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
gomodules.xyz/jsonpatch/v2 v2.1.0
google.golang.org/appengine v1.6.6 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
k8s.io/api v0.19.2
k8s.io/apiextensions-apiserver v0.19.2
k8s.io/apimachinery v0.19.2
k8s.io/client-go v0.19.2
k8s.io/utils v0.0.0-20200912215256-4140de9c8800
sigs.k8s.io/yaml v1.2.0
)
68 changes: 54 additions & 14 deletions go.sum

Large diffs are not rendered by default.

43 changes: 37 additions & 6 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package builder
import (
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -57,9 +59,11 @@ func ControllerManagedBy(m manager.Manager) *Builder {

// ForInput represents the information set by For method.
type ForInput struct {
object client.Object
predicates []predicate.Predicate
err error
object client.Object
predicates []predicate.Predicate
err error
conditionallyRun bool
waitTime time.Duration
}

// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
Expand Down Expand Up @@ -256,7 +260,34 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
}
ctrlOptions.Log = ctrlOptions.Log.WithValues("reconcilerGroup", gvk.Group, "reconcilerKind", gvk.Kind)

// Build the controller and return.
blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
return err
// Build the base controller
baseController, err := controller.NewUnmanaged(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
if err != nil {
return err
}

// Set the builder controller to either the base controller or wrapped as a ConditionalController.
var ctrl controller.Controller
if blder.forInput.conditionallyRun {
dc, err := discovery.NewDiscoveryClientForConfig(blder.mgr.GetConfig())
if err != nil {
return err
}
sc := baseController.(controller.StoppableController)
sc.SaveWatches()
ctrl = &controller.ConditionalController{
Cache: blder.mgr.GetCache(),
ConditionalOn: blder.forInput.object,
Controller: sc,
DiscoveryClient: dc,
Scheme: blder.mgr.GetScheme(),
WaitTime: blder.forInput.waitTime,
}

} else {
ctrl = baseController
}
blder.ctrl = ctrl

return blder.mgr.Add(ctrl)
}
23 changes: 23 additions & 0 deletions pkg/builder/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ limitations under the License.
package builder

import (
"time"

"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const defaultWaitTime = time.Minute

// {{{ "Functional" Option Interfaces

// ForOption is some configuration that modifies options for a For request.
Expand Down Expand Up @@ -75,4 +79,23 @@ var _ ForOption = &Predicates{}
var _ OwnsOption = &Predicates{}
var _ WatchesOption = &Predicates{}

// ConditionallyRun runs the controller
// condtionally on the existence of the forInput object
// in the cluster's discovery doc, letting you start a
// controller manager for a CRD not yet installed on the cluster.
type ConditionallyRun struct {
WaitTime time.Duration
}

// ApplyToFor applies this configuration to the give forInput options,
// setting the waitTime to the default wait time if it is unset.
func (w ConditionallyRun) ApplyToFor(opts *ForInput) {
opts.conditionallyRun = true
if w.WaitTime == time.Duration(0) {
opts.waitTime = defaultWaitTime
} else {
opts.waitTime = w.WaitTime
}
}

// }}}
17 changes: 16 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type Informers interface {
// of the underlying object.
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)

// Remove the informer for the given object.
Remove(ctx context.Context, obj runtime.Object) error

// Start runs all the informers known to this cache until the context is closed.
// It blocks.
Start(ctx context.Context) error
Expand All @@ -84,6 +87,18 @@ type Informer interface {
AddIndexers(indexers toolscache.Indexers) error
//HasSynced return true if the informers underlying store has synced
HasSynced() bool

// RemoveEventHandler currently just decrements a the count of event handlers
// The goals it to have SharedInformer support RemoveEventHandler (and actually remove
// the handler instead of just decrementing a count).
RemoveEventHandler(id int) error

// CountEventHandlers returns the number of event handlers added to an informer.
CountEventHandlers() int

// RunWithStopOptions runs the informer and provides options to be checked that
// would indicate under what conditions the informer should stop.
RunWithStopOptions(stopOptions toolscache.StopOptions) toolscache.StopReason
}

// Options are the optional arguments for creating a new InformersMap object
Expand Down Expand Up @@ -126,7 +141,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
// Construct a new Mapper if unset
if opts.Mapper == nil {
var err error
opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config)
opts.Mapper, err = apiutil.NewDynamicRESTMapper(config)
if err != nil {
log.WithName("setup").Error(err, "Failed to get API Group-Resources")
return opts, fmt.Errorf("could not create RESTMapper from config")
Expand Down
29 changes: 29 additions & 0 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"reflect"
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -159,6 +160,24 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In
return i.Informer, err
}

// GetInformerNonBlocking returns the informer for the obj without waiting for its cache to sync.
func (ip *informerCache) GetInformerNonBlocking(obj runtime.Object) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
if err != nil {
return nil, err
}

// Use a cancelled context to signal non-blocking
ctx, cancel := context.WithCancel(context.Background())
cancel()

_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
if err != nil && !apierrors.IsTimeout(err) {
return nil, err
}
return i.Informer, nil
}

// NeedLeaderElection implements the LeaderElectionRunnable interface
// to indicate that this can be started without requiring the leader lock
func (ip *informerCache) NeedLeaderElection() bool {
Expand Down Expand Up @@ -216,3 +235,13 @@ func indexByField(indexer Informer, field string, extractor client.IndexerFunc)

return indexer.AddIndexers(cache.Indexers{internal.FieldIndexName(field): indexFunc})
}

// Remove removes an informer specified by the obj argument from the cache and stops it if it existed.
func (ip *informerCache) Remove(ctx context.Context, obj runtime.Object) error {
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
if err != nil {
return err
}

return ip.InformersMap.Remove(gvk, obj)
}
5 changes: 5 additions & 0 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func (c *FakeInformers) Start(ctx context.Context) error {
return c.Error
}

// Remove implements Cache
func (c *FakeInformers) Remove(ctx context.Context, obj runtime.Object) error {
return c.Error
}

// IndexField implements Cache
func (c *FakeInformers) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
return nil
Expand Down
106 changes: 106 additions & 0 deletions pkg/cache/internal/counting_informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package internal

import (
"fmt"
"time"

"k8s.io/client-go/tools/cache"
)

// CountingInformer exposes a way to track the number of EventHandlers
// registered on an Informer.
type CountingInformer interface {
cache.SharedIndexInformer
CountEventHandlers() int
RemoveEventHandler(id int) error
RunWithStopOptions(stopOptions cache.StopOptions) cache.StopReason
}

// HandlerCountingInformer implements the CountingInformer.
// It increments the count every time AddEventHandler is called,
// and decrements the count every time RemoveEventHandler is called.
//
// It doesn't actually RemoveEventHandlers because that feature is not implemented
// in client-go, but we're are naming it this way to suggest what the interface would look
// like if/when it does get added to client-go.
//
// We can get rid of this if apimachinery adds the ability to retrieve this from the SharedIndexInformer
// but until then, we have to track it ourselves
type HandlerCountingInformer struct {
// Informer is the cached informer
informer cache.SharedIndexInformer

// count indicates the number of EventHandlers registered on the informer
count int
}

func (i *HandlerCountingInformer) RemoveEventHandler(id int) error {
i.count--
fmt.Printf("decrement, count is %+v\n", i.count)
return nil
}

func (i *HandlerCountingInformer) AddEventHandler(handler cache.ResourceEventHandler) {
i.count++
fmt.Printf("increment, count is %+v\n", i.count)
i.informer.AddEventHandler(handler)
}

func (i *HandlerCountingInformer) CountEventHandlers() int {
return i.count
}

func (i *HandlerCountingInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) {
i.count++
i.informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
}
func (i *HandlerCountingInformer) AddIndexers(indexers cache.Indexers) error {
return i.informer.AddIndexers(indexers)
}

func (i *HandlerCountingInformer) HasSynced() bool {
return i.informer.HasSynced()
}

func (i *HandlerCountingInformer) GetStore() cache.Store {
return i.informer.GetStore()
}

func (i *HandlerCountingInformer) GetController() cache.Controller {
return i.informer.GetController()
}

func (i *HandlerCountingInformer) LastSyncResourceVersion() string {
return i.informer.LastSyncResourceVersion()
}

func (i *HandlerCountingInformer) SetWatchErrorHandler(handler cache.WatchErrorHandler) error {
return i.informer.SetWatchErrorHandler(handler)
}

func (i *HandlerCountingInformer) GetIndexer() cache.Indexer {
return i.informer.GetIndexer()
}

func (i *HandlerCountingInformer) Run(stopCh <-chan struct{}) {
i.informer.Run(stopCh)
}

func (i *HandlerCountingInformer) RunWithStopOptions(stopOptions cache.StopOptions) cache.StopReason {
return i.informer.RunWithStopOptions(stopOptions)
}

//// StopOptions let the caller specify which conditions to stop the informer.
//type StopOptions struct {
// // StopChannel stops the Informer when it receives a close signal.
// StopChannel <-chan struct{}
//
// Cancel context.CancelFunc
//
// // OnListError inspects errors returned from the infromer's underlying refloector,
// // and based on the error determines whether or not to stop the informer.
// OnListError func(error) bool
//}
//
//// StopReason is a custom typed error that indicates how the informer was stopped.
//type StopReason error
14 changes: 14 additions & 0 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
return m.structured.Get(ctx, gvk, obj)
}

// Remove will remove an new Informer from the InformersMap and stop it if it exists.
func (m *InformersMap) Remove(gvk schema.GroupVersionKind, obj runtime.Object) error {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList

switch {
case isUnstructured:
return m.unstructured.Remove(gvk)
default:
return m.structured.Remove(gvk)
}
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
Expand Down
Loading