Skip to content

Commit

Permalink
client-go + apimachinery watch: context support
Browse files Browse the repository at this point in the history
The Lister and Watcher interfaces only supported methods without context, but
were typically implemented with client-go API calls which need a context. New
interfaces get added using the same approach as in
kubernetes/kubernetes#129109.

Kubernetes-commit: 6688adae142e37114d9dfa8d94cd1d8a91fbcc13
  • Loading branch information
pohly authored and k8s-publishing-bot committed Dec 20, 2024
1 parent a2cb7d3 commit 675c4f7
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 9 deletions.
15 changes: 12 additions & 3 deletions pkg/watch/streamwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Reporter interface {
// StreamWatcher turns any stream for which you can write a Decoder interface
// into a watch.Interface.
type StreamWatcher struct {
logger klog.Logger
sync.Mutex
source Decoder
reporter Reporter
Expand All @@ -59,8 +60,16 @@ type StreamWatcher struct {
}

// NewStreamWatcher creates a StreamWatcher from the given decoder.
//
// Contextual logging: NewStreamWatcherWithLogger should be used instead of NewStreamWatcher in code which supports contextual logging.
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
return NewStreamWatcherWithLogger(klog.Background(), d, r)
}

// NewStreamWatcherWithLogger creates a StreamWatcher from the given decoder and logger.
func NewStreamWatcherWithLogger(logger klog.Logger, d Decoder, r Reporter) *StreamWatcher {
sw := &StreamWatcher{
logger: logger,
source: d,
reporter: r,
// It's easy for a consumer to add buffering via an extra
Expand Down Expand Up @@ -98,7 +107,7 @@ func (sw *StreamWatcher) Stop() {

// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *StreamWatcher) receive() {
defer utilruntime.HandleCrash()
defer utilruntime.HandleCrashWithLogger(sw.logger)
defer close(sw.result)
defer sw.Stop()
for {
Expand All @@ -108,10 +117,10 @@ func (sw *StreamWatcher) receive() {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
sw.logger.V(1).Info("Unexpected EOF during watch stream event decoding", "err", err)
default:
if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
sw.logger.V(5).Info("Unable to decode an event from the watch stream", "err", err)
} else {
select {
case <-sw.done:
Expand Down
3 changes: 3 additions & 0 deletions pkg/watch/streamwatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestStreamWatcher(t *testing.T) {
}

fd := fakeDecoder{items: make(chan Event, 5)}
//nolint:logcheck // Intentionally uses the old API.
sw := NewStreamWatcher(fd, nil)

for _, item := range table {
Expand All @@ -87,6 +88,7 @@ func TestStreamWatcher(t *testing.T) {
func TestStreamWatcherError(t *testing.T) {
fd := fakeDecoder{err: fmt.Errorf("test error")}
fr := &fakeReporter{}
//nolint:logcheck // Intentionally uses the old API.
sw := NewStreamWatcher(fd, fr)
evt, ok := <-sw.ResultChan()
if !ok {
Expand All @@ -110,6 +112,7 @@ func TestStreamWatcherError(t *testing.T) {
func TestStreamWatcherRace(t *testing.T) {
fd := fakeDecoder{err: fmt.Errorf("test error")}
fr := &fakeReporter{}
//nolint:logcheck // Intentionally uses the old API.
sw := NewStreamWatcher(fd, fr)
time.Sleep(10 * time.Millisecond)
sw.Stop()
Expand Down
35 changes: 29 additions & 6 deletions pkg/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/klog/v2"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
)

// Interface can be implemented by anything that knows how to watch and report changes.
Expand Down Expand Up @@ -103,29 +104,42 @@ func (w emptyWatch) ResultChan() <-chan Event {

// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
type FakeWatcher struct {
logger klog.Logger
result chan Event
stopped bool
sync.Mutex
}

var _ Interface = &FakeWatcher{}

// Contextual logging: NewFakeWithOptions and a logger in the FakeOptions should be used instead in code which supports contextual logging.
func NewFake() *FakeWatcher {
return &FakeWatcher{
result: make(chan Event),
}
return NewFakeWithOptions(FakeOptions{})
}

// Contextual logging: NewFakeWithOptions and a logger in the FakeOptions should be used instead in code which supports contextual logging.
func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
return NewFakeWithOptions(FakeOptions{ChannelSize: size})
}

func NewFakeWithOptions(options FakeOptions) *FakeWatcher {
return &FakeWatcher{
result: make(chan Event, size),
logger: ptr.Deref(options.Logger, klog.Background()),
result: make(chan Event, options.ChannelSize),
}
}

type FakeOptions struct {
Logger *klog.Logger
ChannelSize int
}

// Stop implements Interface.Stop().
func (f *FakeWatcher) Stop() {
f.Lock()
defer f.Unlock()
if !f.stopped {
klog.V(4).Infof("Stopping fake watcher.")
f.logger.V(4).Info("Stopping fake watcher")
close(f.result)
f.stopped = true
}
Expand Down Expand Up @@ -176,13 +190,22 @@ func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {

// RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
type RaceFreeFakeWatcher struct {
logger klog.Logger
result chan Event
Stopped bool
sync.Mutex
}

var _ Interface = &RaceFreeFakeWatcher{}

// Contextual logging: RaceFreeFakeWatcherWithLogger should be used instead of NewRaceFreeFake in code which supports contextual logging.
func NewRaceFreeFake() *RaceFreeFakeWatcher {
return NewRaceFreeFakeWithLogger(klog.Background())
}

func NewRaceFreeFakeWithLogger(logger klog.Logger) *RaceFreeFakeWatcher {
return &RaceFreeFakeWatcher{
logger: logger,
result: make(chan Event, DefaultChanSize),
}
}
Expand All @@ -192,7 +215,7 @@ func (f *RaceFreeFakeWatcher) Stop() {
f.Lock()
defer f.Unlock()
if !f.Stopped {
klog.V(4).Infof("Stopping fake watcher.")
f.logger.V(4).Info("Stopping fake watcher")
close(f.result)
f.Stopped = true
}
Expand Down

0 comments on commit 675c4f7

Please sign in to comment.