diff --git a/pkg/watch/streamwatcher.go b/pkg/watch/streamwatcher.go index 99f6770b9..42dcac2b9 100644 --- a/pkg/watch/streamwatcher.go +++ b/pkg/watch/streamwatcher.go @@ -55,7 +55,7 @@ type StreamWatcher struct { source Decoder reporter Reporter result chan Event - stopped bool + done chan struct{} } // NewStreamWatcher creates a StreamWatcher from the given decoder. @@ -67,6 +67,11 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher { // goroutine/channel, but impossible for them to remove it, // so nonbuffered is better. result: make(chan Event), + // If the watcher is externally stopped there is no receiver anymore + // and the send operations on the result channel, especially the + // error reporting might block forever. + // Therefore a dedicated stop channel is used to resolve this blocking. + done: make(chan struct{}), } go sw.receive() return sw @@ -82,19 +87,15 @@ func (sw *StreamWatcher) Stop() { // Call Close() exactly once by locking and setting a flag. sw.Lock() defer sw.Unlock() - if !sw.stopped { - sw.stopped = true + // closing a closed channel always panics, therefore check before closing + select { + case <-sw.done: + default: + close(sw.done) sw.source.Close() } } -// stopping returns true if Stop() was called previously. -func (sw *StreamWatcher) stopping() bool { - sw.Lock() - defer sw.Unlock() - return sw.stopped -} - // receive reads result from the decoder in a loop and sends down the result channel. func (sw *StreamWatcher) receive() { defer utilruntime.HandleCrash() @@ -103,10 +104,6 @@ func (sw *StreamWatcher) receive() { for { action, obj, err := sw.source.Decode() if err != nil { - // Ignore expected error. - if sw.stopping() { - return - } switch err { case io.EOF: // watch closed normally @@ -116,17 +113,24 @@ func (sw *StreamWatcher) receive() { if net.IsProbableEOF(err) || net.IsTimeout(err) { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { - sw.result <- Event{ + select { + case <-sw.done: + case sw.result <- Event{ Type: Error, Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), + }: } } } return } - sw.result <- Event{ + select { + case <-sw.done: + return + case sw.result <- Event{ Type: action, Object: obj, + }: } } } diff --git a/pkg/watch/streamwatcher_test.go b/pkg/watch/streamwatcher_test.go index 685a0f13a..0b459c895 100644 --- a/pkg/watch/streamwatcher_test.go +++ b/pkg/watch/streamwatcher_test.go @@ -21,6 +21,7 @@ import ( "io" "reflect" "testing" + "time" "k8s.io/apimachinery/pkg/runtime" . "k8s.io/apimachinery/pkg/watch" @@ -105,3 +106,16 @@ func TestStreamWatcherError(t *testing.T) { t.Fatalf("unexpected open channel") } } + +func TestStreamWatcherRace(t *testing.T) { + fd := fakeDecoder{err: fmt.Errorf("test error")} + fr := &fakeReporter{} + sw := NewStreamWatcher(fd, fr) + time.Sleep(10 * time.Millisecond) + sw.Stop() + time.Sleep(10 * time.Millisecond) + _, ok := <-sw.ResultChan() + if ok { + t.Fatalf("unexpected pending send") + } +}