Skip to content

✨ Let users specify their own EventBroadcaster for the manager #591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/internal/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ type provider struct {
}

// NewProvider create a new Provider instance.
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error) {
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) {
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to init clientSet: %v", err)
}

p := &provider{scheme: scheme, logger: logger}
p.eventBroadcaster = record.NewBroadcaster()
p := &provider{scheme: scheme, logger: logger, eventBroadcaster: broadcaster}
p.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
p.eventBroadcaster.StartEventWatcher(
func(e *corev1.Event) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/internal/recorder/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/internal/recorder"
)

var _ = Describe("recorder.Provider", func() {
Describe("NewProvider", func() {
It("should return a provider instance and a nil error.", func() {
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{})
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
Expect(provider).NotTo(BeNil())
Expect(err).NotTo(HaveOccurred())
})
Expand All @@ -36,13 +37,13 @@ var _ = Describe("recorder.Provider", func() {
// Invalid the config
cfg1 := *cfg
cfg1.ContentType = "invalid-type"
_, err := recorder.NewProvider(&cfg1, scheme.Scheme, tlog.NullLogger{})
_, err := recorder.NewProvider(&cfg1, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
Expect(err.Error()).To(ContainSubstring("failed to init clientSet"))
})
})
Describe("GetEventRecorder", func() {
It("should return a recorder instance.", func() {
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{})
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
Expect(err).NotTo(HaveOccurred())

recorder := provider.GetEventRecorderFor("test")
Expand Down
12 changes: 10 additions & 2 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,12 @@ type Options struct {
// use the cache for reads and the client for writes.
NewClient NewClientFunc

// EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API
// Use this to customize the event correlator and spam filter
EventBroadcaster record.EventBroadcaster

// Dependency injection for testing
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error)
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error)
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
newMetricsListener func(addr string) (net.Listener, error)
}
Expand Down Expand Up @@ -231,7 +235,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
// Create the recorder provider to inject event recorders for the components.
// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
// to the particular controller that it's being injected into, rather than a generic one like is here.
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"))
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"), options.EventBroadcaster)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -342,5 +346,9 @@ func setOptionsDefaults(options Options) Options {
options.RetryPeriod = &retryPeriod
}

if options.EventBroadcaster == nil {
options.EventBroadcaster = record.NewBroadcaster()
}

return options
}
3 changes: 2 additions & 1 deletion pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -111,7 +112,7 @@ var _ = Describe("manger.Manager", func() {

It("should return an error it can't create a recorder.Provider", func(done Done) {
m, err := New(cfg, Options{
newRecorderProvider: func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error) {
newRecorderProvider: func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) {
return nil, fmt.Errorf("expected error")
},
})
Expand Down