From e51562ebd9a2902ef5f1f120ee3fe94d7ed026c9 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Sun, 31 Jan 2021 17:19:09 +0100 Subject: [PATCH 1/3] fix sync problem in streamwatcher The streamwatcher has a synchronization problem that may lead to a go routine blocking forever when closing a stream watch. This occasionally happens, when informers are cancelled together with the watch request using the stop channel, which leads to an increaing number of blocked go routines, if imformers are dynamicaly created and deleted again. The function `receive` checks under a lock whether the watch has been stopped, before an error is reported to the result channel. The problem here is, that in between the watcher might be stopped by calling the `Stop` method. In the actual code this is done by the `cache.Reflector` using the streamwatcher by a defer which is executed after the caller already stopped reading from the result channel. As a result the stopping flag might be set after the check and trying to send the error event blocks this send operation forever, because there will never be a receiver again. The fix introduces a dedicated local stop channel that is closed by the `Stop` method and used in a select statement together with the send operation to finally abort the loop. Kubernetes-commit: d8ee8e427e4daf8406ee94e919c94014600bb477 --- pkg/watch/streamwatcher.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/watch/streamwatcher.go b/pkg/watch/streamwatcher.go index 99f6770b9..4960dbf1c 100644 --- a/pkg/watch/streamwatcher.go +++ b/pkg/watch/streamwatcher.go @@ -55,6 +55,7 @@ type StreamWatcher struct { source Decoder reporter Reporter result chan Event + done chan struct{} stopped bool } @@ -67,6 +68,11 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher { // goroutine/channel, but impossible for them to remove it, // so nonbuffered is better. result: make(chan Event), + // If the watcher is externally stopped there is no receiver anymore + // and the send operations on the result channel, especially the + // error reporting might block forever. + // Therefore a dedicated stop channel is used to resolve this blocking. + done: make(chan struct{}), } go sw.receive() return sw @@ -84,6 +90,7 @@ func (sw *StreamWatcher) Stop() { defer sw.Unlock() if !sw.stopped { sw.stopped = true + close(sw.done) sw.source.Close() } } @@ -116,17 +123,25 @@ func (sw *StreamWatcher) receive() { if net.IsProbableEOF(err) || net.IsTimeout(err) { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { - sw.result <- Event{ + select { + case <-sw.done: + return + case sw.result <- Event{ Type: Error, Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), + }: } } } return } - sw.result <- Event{ + select { + case <-sw.done: + return + case sw.result <- Event{ Type: action, Object: obj, + }: } } } From 193cd4a1da53c5e8f925e7aa7c4b1699d5f4e1e8 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Fri, 5 Mar 2021 17:14:19 +0100 Subject: [PATCH 2/3] add comment describing the race condition + TODO for appropriate test Kubernetes-commit: 932f98acafbfb4b0cb08d30738dbdc3fa2fcf6b2 --- pkg/watch/streamwatcher.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/watch/streamwatcher.go b/pkg/watch/streamwatcher.go index 4960dbf1c..48a5f0f57 100644 --- a/pkg/watch/streamwatcher.go +++ b/pkg/watch/streamwatcher.go @@ -124,6 +124,18 @@ func (sw *StreamWatcher) receive() { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { select { + // TODO: figure out how to test that sending the error after an externally stopped watcher + // cannot block anymore because of the introduced done channel. + // The function receive checks under a lock whether the watch has been stopped, + // before an error from the watch stream is reported to the result channel. + // The problem here is, that in between the watcher might be stopped by + // calling the Stop method. In the actual code this is done by the + // cache.Reflector using the streamwatcher by a defer (method watchHandler) + // which is executed after the caller already stopped reading from the result channel. + // As a result the stopping flag might be set after the check + // and trying to send the error event blocks this send operation forever, + // because there will never be a receiver again. + // This results in dead go routines trying to send on the result channel, forever. case <-sw.done: return case sw.result <- Event{ From 963dac0cf2bc4d8732dd2d9895fc2ee9b5d58555 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Sat, 6 Mar 2021 14:42:59 +0100 Subject: [PATCH 3/3] simplier fix + test for race condition Kubernetes-commit: 2355ceb79a2019850d640d00be40b97cc133fd4d --- pkg/watch/streamwatcher.go | 31 ++++--------------------------- pkg/watch/streamwatcher_test.go | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/pkg/watch/streamwatcher.go b/pkg/watch/streamwatcher.go index 48a5f0f57..42dcac2b9 100644 --- a/pkg/watch/streamwatcher.go +++ b/pkg/watch/streamwatcher.go @@ -56,7 +56,6 @@ type StreamWatcher struct { reporter Reporter result chan Event done chan struct{} - stopped bool } // NewStreamWatcher creates a StreamWatcher from the given decoder. @@ -88,20 +87,15 @@ func (sw *StreamWatcher) Stop() { // Call Close() exactly once by locking and setting a flag. sw.Lock() defer sw.Unlock() - if !sw.stopped { - sw.stopped = true + // closing a closed channel always panics, therefore check before closing + select { + case <-sw.done: + default: close(sw.done) sw.source.Close() } } -// stopping returns true if Stop() was called previously. -func (sw *StreamWatcher) stopping() bool { - sw.Lock() - defer sw.Unlock() - return sw.stopped -} - // receive reads result from the decoder in a loop and sends down the result channel. func (sw *StreamWatcher) receive() { defer utilruntime.HandleCrash() @@ -110,10 +104,6 @@ func (sw *StreamWatcher) receive() { for { action, obj, err := sw.source.Decode() if err != nil { - // Ignore expected error. - if sw.stopping() { - return - } switch err { case io.EOF: // watch closed normally @@ -124,20 +114,7 @@ func (sw *StreamWatcher) receive() { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { select { - // TODO: figure out how to test that sending the error after an externally stopped watcher - // cannot block anymore because of the introduced done channel. - // The function receive checks under a lock whether the watch has been stopped, - // before an error from the watch stream is reported to the result channel. - // The problem here is, that in between the watcher might be stopped by - // calling the Stop method. In the actual code this is done by the - // cache.Reflector using the streamwatcher by a defer (method watchHandler) - // which is executed after the caller already stopped reading from the result channel. - // As a result the stopping flag might be set after the check - // and trying to send the error event blocks this send operation forever, - // because there will never be a receiver again. - // This results in dead go routines trying to send on the result channel, forever. case <-sw.done: - return case sw.result <- Event{ Type: Error, Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), diff --git a/pkg/watch/streamwatcher_test.go b/pkg/watch/streamwatcher_test.go index 685a0f13a..0b459c895 100644 --- a/pkg/watch/streamwatcher_test.go +++ b/pkg/watch/streamwatcher_test.go @@ -21,6 +21,7 @@ import ( "io" "reflect" "testing" + "time" "k8s.io/apimachinery/pkg/runtime" . "k8s.io/apimachinery/pkg/watch" @@ -105,3 +106,16 @@ func TestStreamWatcherError(t *testing.T) { t.Fatalf("unexpected open channel") } } + +func TestStreamWatcherRace(t *testing.T) { + fd := fakeDecoder{err: fmt.Errorf("test error")} + fr := &fakeReporter{} + sw := NewStreamWatcher(fd, fr) + time.Sleep(10 * time.Millisecond) + sw.Stop() + time.Sleep(10 * time.Millisecond) + _, ok := <-sw.ResultChan() + if ok { + t.Fatalf("unexpected pending send") + } +}