From 675c4f7a80a159b78d4ee4edfb269d0ea1bb526d Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 20 Dec 2024 13:55:47 +0100 Subject: [PATCH] client-go + apimachinery watch: context support The Lister and Watcher interfaces only supported methods without context, but were typically implemented with client-go API calls which need a context. New interfaces get added using the same approach as in https://github.com/kubernetes/kubernetes/pull/129109. Kubernetes-commit: 6688adae142e37114d9dfa8d94cd1d8a91fbcc13 --- pkg/watch/streamwatcher.go | 15 +++++++++++--- pkg/watch/streamwatcher_test.go | 3 +++ pkg/watch/watch.go | 35 +++++++++++++++++++++++++++------ 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/pkg/watch/streamwatcher.go b/pkg/watch/streamwatcher.go index 42dcac2b9..b422ca9f5 100644 --- a/pkg/watch/streamwatcher.go +++ b/pkg/watch/streamwatcher.go @@ -51,6 +51,7 @@ type Reporter interface { // StreamWatcher turns any stream for which you can write a Decoder interface // into a watch.Interface. type StreamWatcher struct { + logger klog.Logger sync.Mutex source Decoder reporter Reporter @@ -59,8 +60,16 @@ type StreamWatcher struct { } // NewStreamWatcher creates a StreamWatcher from the given decoder. +// +// Contextual logging: NewStreamWatcherWithLogger should be used instead of NewStreamWatcher in code which supports contextual logging. func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher { + return NewStreamWatcherWithLogger(klog.Background(), d, r) +} + +// NewStreamWatcherWithLogger creates a StreamWatcher from the given decoder and logger. +func NewStreamWatcherWithLogger(logger klog.Logger, d Decoder, r Reporter) *StreamWatcher { sw := &StreamWatcher{ + logger: logger, source: d, reporter: r, // It's easy for a consumer to add buffering via an extra @@ -98,7 +107,7 @@ func (sw *StreamWatcher) Stop() { // receive reads result from the decoder in a loop and sends down the result channel. func (sw *StreamWatcher) receive() { - defer utilruntime.HandleCrash() + defer utilruntime.HandleCrashWithLogger(sw.logger) defer close(sw.result) defer sw.Stop() for { @@ -108,10 +117,10 @@ func (sw *StreamWatcher) receive() { case io.EOF: // watch closed normally case io.ErrUnexpectedEOF: - klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err) + sw.logger.V(1).Info("Unexpected EOF during watch stream event decoding", "err", err) default: if net.IsProbableEOF(err) || net.IsTimeout(err) { - klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) + sw.logger.V(5).Info("Unable to decode an event from the watch stream", "err", err) } else { select { case <-sw.done: diff --git a/pkg/watch/streamwatcher_test.go b/pkg/watch/streamwatcher_test.go index 0b459c895..e6f68f667 100644 --- a/pkg/watch/streamwatcher_test.go +++ b/pkg/watch/streamwatcher_test.go @@ -64,6 +64,7 @@ func TestStreamWatcher(t *testing.T) { } fd := fakeDecoder{items: make(chan Event, 5)} + //nolint:logcheck // Intentionally uses the old API. sw := NewStreamWatcher(fd, nil) for _, item := range table { @@ -87,6 +88,7 @@ func TestStreamWatcher(t *testing.T) { func TestStreamWatcherError(t *testing.T) { fd := fakeDecoder{err: fmt.Errorf("test error")} fr := &fakeReporter{} + //nolint:logcheck // Intentionally uses the old API. sw := NewStreamWatcher(fd, fr) evt, ok := <-sw.ResultChan() if !ok { @@ -110,6 +112,7 @@ func TestStreamWatcherError(t *testing.T) { func TestStreamWatcherRace(t *testing.T) { fd := fakeDecoder{err: fmt.Errorf("test error")} fr := &fakeReporter{} + //nolint:logcheck // Intentionally uses the old API. sw := NewStreamWatcher(fd, fr) time.Sleep(10 * time.Millisecond) sw.Stop() diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index ce37fd8c1..251459834 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -23,6 +23,7 @@ import ( "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/utils/ptr" ) // Interface can be implemented by anything that knows how to watch and report changes. @@ -103,29 +104,42 @@ func (w emptyWatch) ResultChan() <-chan Event { // FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe. type FakeWatcher struct { + logger klog.Logger result chan Event stopped bool sync.Mutex } +var _ Interface = &FakeWatcher{} + +// Contextual logging: NewFakeWithOptions and a logger in the FakeOptions should be used instead in code which supports contextual logging. func NewFake() *FakeWatcher { - return &FakeWatcher{ - result: make(chan Event), - } + return NewFakeWithOptions(FakeOptions{}) } +// Contextual logging: NewFakeWithOptions and a logger in the FakeOptions should be used instead in code which supports contextual logging. func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher { + return NewFakeWithOptions(FakeOptions{ChannelSize: size}) +} + +func NewFakeWithOptions(options FakeOptions) *FakeWatcher { return &FakeWatcher{ - result: make(chan Event, size), + logger: ptr.Deref(options.Logger, klog.Background()), + result: make(chan Event, options.ChannelSize), } } +type FakeOptions struct { + Logger *klog.Logger + ChannelSize int +} + // Stop implements Interface.Stop(). func (f *FakeWatcher) Stop() { f.Lock() defer f.Unlock() if !f.stopped { - klog.V(4).Infof("Stopping fake watcher.") + f.logger.V(4).Info("Stopping fake watcher") close(f.result) f.stopped = true } @@ -176,13 +190,22 @@ func (f *FakeWatcher) Action(action EventType, obj runtime.Object) { // RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe. type RaceFreeFakeWatcher struct { + logger klog.Logger result chan Event Stopped bool sync.Mutex } +var _ Interface = &RaceFreeFakeWatcher{} + +// Contextual logging: RaceFreeFakeWatcherWithLogger should be used instead of NewRaceFreeFake in code which supports contextual logging. func NewRaceFreeFake() *RaceFreeFakeWatcher { + return NewRaceFreeFakeWithLogger(klog.Background()) +} + +func NewRaceFreeFakeWithLogger(logger klog.Logger) *RaceFreeFakeWatcher { return &RaceFreeFakeWatcher{ + logger: logger, result: make(chan Event, DefaultChanSize), } } @@ -192,7 +215,7 @@ func (f *RaceFreeFakeWatcher) Stop() { f.Lock() defer f.Unlock() if !f.Stopped { - klog.V(4).Infof("Stopping fake watcher.") + f.logger.V(4).Info("Stopping fake watcher") close(f.result) f.Stopped = true }