client/v3: warn when watcher stream buffer is backlogged#21704
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: xigang The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @xigang. Thanks for your PR. I'm waiting for a etcd-io member to verify that this patch is reasonable to test. If it is, they should reply with Regular contributors should join the org to skip this step. Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/cc @richabanker |
|
@serathius: GitHub didn't allow me to request PR reviews from the following users: richabanker. Note that only etcd-io members and repo collaborators can review this PR, and authors cannot review their own PRs. DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
This looks similar to K8s logic for detecting buffer growing. Let's take learning from it kubernetes/kubernetes#138126 |
|
/cc @mborsz |
|
@serathius: GitHub didn't allow me to request PR reviews from the following users: mborsz. Note that only etcd-io members and repo collaborators can review this PR, and authors cannot review their own PRs. DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/ok-to-test |
Codecov Report❌ Patch coverage is
Additional details and impacted files
... and 45 files with indirect coverage changes @@ Coverage Diff @@
## main #21704 +/- ##
==========================================
+ Coverage 70.22% 70.26% +0.03%
==========================================
Files 426 427 +1
Lines 35211 35280 +69
==========================================
+ Hits 24727 24789 +62
- Misses 9090 9097 +7
Partials 1394 1394 Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
|
@richabanker PTAL. Thanks. |
768cfb5 to
be24a63
Compare
|
/retest |
|
@mborsz could you take a look if this implementation matches same requirements as you proposed in kubernetes/kubernetes#138126 ? |
|
/test pull-etcd-e2e-386 |
| zap.Int("buffered-responses", len(ws.buf)), | ||
| zap.Int("response-count", responseCount), | ||
| zap.Duration("time-waiting", timeWaiting), | ||
| zap.Duration("window", window), |
There was a problem hiding this comment.
I renamed this field to "log-interval".
In block_logger, this represents the elapsed accumulation window used for rate-limited logging. eventCount and timeWaiting are accumulated over this window, and a warning is emitted only when the window reaches the configured interval and the accumulated waiting time exceeds the threshold.
| zap.String("range-end", ws.initReq.end), | ||
| zap.Int("buffered-responses", len(ws.buf)), | ||
| zap.Int("response-count", responseCount), | ||
| zap.Duration("time-waiting", timeWaiting), |
There was a problem hiding this comment.
The field name is too vague. It should be renamed to something like "time-blocked" to indicate the time the buffer spent blocked while waiting for the consumer to drain it.
| case <-ws.initReq.ctx.Done(): | ||
| return | ||
| case <-resumec: | ||
| ws.bufWaitStartTime = time.Time{} |
There was a problem hiding this comment.
Let's not touch private fields of struct. Maybe add a reset function?
There was a problem hiding this comment.
Done. Added resetBufWait so the resume path no longer resets bufWaitStartTime directly.
|
ping @richabanker @mborsz As we are planning to release etcd v3.7.0, please prioritize this review. |
Signed-off-by: xigang <wangxigang2014@gmail.com>
| } | ||
| select { | ||
| case outc <- *curWr: | ||
| w.recordBufWait(ws) |
There was a problem hiding this comment.
Do we want to record wait times only upon a successful dequeue? Or would we want visibility into forever stalled consumers as well (by capturing that in some sort of a ticker based log)?
cc @serathius ?
| ws.bufWaitStartTime = ws.bufLogger.now() | ||
| } | ||
|
|
||
| func (w *watchGRPCStream) recordBufWait(ws *watcherStream) { |
There was a problem hiding this comment.
Should ideally add a test for the new behavior in watch.go too...
There was a problem hiding this comment.
A quick test (written with the help of Gemini):
diff --git a/client/v3/watch_test.go b/client/v3/watch_test.go
index ca0b86d26..8b6bbe756 100644
--- a/client/v3/watch_test.go
+++ b/client/v3/watch_test.go
@@ -16,10 +16,14 @@ package clientv3
import (
"context"
+ "sync"
"testing"
+ "time"
+ "go.uber.org/zap"
"google.golang.org/grpc/metadata"
+ pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
)
@@ -106,3 +110,108 @@ func TestStreamKeyFromCtx(t *testing.T) {
})
}
}
+
+func TestServeSubstreamLogsSlowConsumer(t *testing.T) {
+ const (
+ interval = 5 * time.Second
+ threshold = 100 * time.Millisecond
+ )
+
+ ctx, cancel := context.WithCancel(t.Context())
+ defer cancel()
+
+ // nowMu protects the virtual clock time `now` against concurrent access between
+ // the test runner thread and the background substream thread.
+ var nowMu sync.Mutex
+ now := time.Unix(0, 0)
+ startedWaiting := make(chan struct{})
+ nowCalls := 0
+
+ // logMu protects logCalls against concurrent access during assertions.
+ var logMu sync.Mutex
+ logCalls := 0
+ logged := make(chan struct{}, 1)
+
+ // Set up watcherStream with a mock blockLogger utilizing virtual time and custom log assertion callback.
+ ws := &watcherStream{
+ initReq: watchRequest{ctx: ctx},
+ outc: make(chan WatchResponse),
+ recvc: make(chan *WatchResponse, 1),
+ donec: make(chan struct{}),
+ }
+ ws.bufLogger = newBlockLogger(interval, threshold, func() time.Time {
+ nowCalls++
+ // Call 1: newBlockLogger (sets lastLogTime to Unix(0,0))
+ // Call 2: startBufWait (sets bufWaitStartTime to Unix(0,0))
+ // Closing startedWaiting when nowCalls reaches 2 signals that serveSubstream
+ // has received the event and recorded its initial buffering wait start time.
+ if nowCalls == 2 {
+ close(startedWaiting)
+ }
+ nowMu.Lock()
+ defer nowMu.Unlock()
+ return now
+ }, func(eventCount int, timeWaiting time.Duration, window time.Duration) {
+ logMu.Lock()
+ defer logMu.Unlock()
+ logCalls++
+ select {
+ case logged <- struct{}{}:
+ default:
+ }
+ })
+
+ w := &watchGRPCStream{
+ ctx: t.Context(),
+ closingc: make(chan *watcherStream, 1),
+ lg: zap.NewNop(), // Use Nop logger since we intercept warning logs via the callback
+ }
+ w.wg.Add(1)
+ // Start the serveSubstream event loop in the background.
+ go w.serveSubstream(ws, make(chan struct{}))
+
+ // Send a watch response to recvc. It will be received by serveSubstream,
+ // appended to the event buffer, and trigger startBufWait.
+ ws.recvc <- &WatchResponse{Header: &pb.ResponseHeader{Revision: 1}}
+
+ // Wait for serveSubstream to process the event, buffer it, and record the wait start time.
+ select {
+ case <-startedWaiting:
+ case <-time.After(10 * time.Second):
+ t.Fatal("timed out waiting for serveSubstream to start buffering")
+ }
+
+ // Advance virtual time past the threshold and interval to simulate a slow consumer.
+ nowMu.Lock()
+ now = now.Add(threshold + interval)
+ nowMu.Unlock()
+
+ // Consume from outc. This unblocks the outc select branch in serveSubstream, triggering recordBufWait.
+ select {
+ case <-ws.outc:
+ case <-time.After(time.Second):
+ t.Fatal("timed out waiting for buffered response delivery")
+ }
+
+ // Verify the backlog warning log callback was triggered.
+ select {
+ case <-logged:
+ case <-time.After(time.Second):
+ t.Fatal("timed out waiting for backlog warning callback")
+ }
+
+ logMu.Lock()
+ if logCalls != 1 {
+ logMu.Unlock()
+ t.Fatalf("expected one backlog warning, got %d", logCalls)
+ }
+ logMu.Unlock()
+
+ // Cleanup context and ensure the serveSubstream goroutine terminates cleanly.
+ cancel()
+ select {
+ case <-ws.donec:
+ case <-time.After(time.Second):
+ t.Fatal("timed out waiting for serveSubstream shutdown")
+ }
+}|
/lgtm with one question about the cases we want to capture logs for, and one suggestion for adding a unit test in watch_test.go Thanks! |
|
@richabanker: changing LGTM is restricted to collaborators DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Fixes: kubernetes/kubernetes#138217