Skip to content

client/v3: warn when watcher stream buffer is backlogged#21704

Open
xigang wants to merge 1 commit into
etcd-io:mainfrom
xigang:watchstream_buf
Open

client/v3: warn when watcher stream buffer is backlogged#21704
xigang wants to merge 1 commit into
etcd-io:mainfrom
xigang:watchstream_buf

Conversation

@xigang
Copy link
Copy Markdown

@xigang xigang commented May 4, 2026

@k8s-ci-robot
Copy link
Copy Markdown

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: xigang
Once this PR has been reviewed and has the lgtm label, please assign serathius for approval. For more information see the Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@k8s-ci-robot
Copy link
Copy Markdown

Hi @xigang. Thanks for your PR.

I'm waiting for a etcd-io member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work.

Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@serathius
Copy link
Copy Markdown
Member

/cc @richabanker

@k8s-ci-robot
Copy link
Copy Markdown

@serathius: GitHub didn't allow me to request PR reviews from the following users: richabanker.

Note that only etcd-io members and repo collaborators can review this PR, and authors cannot review their own PRs.

Details

In response to this:

/cc @richabanker

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@serathius
Copy link
Copy Markdown
Member

This looks similar to K8s logic for detecting buffer growing. Let's take learning from it kubernetes/kubernetes#138126

@serathius
Copy link
Copy Markdown
Member

/cc @mborsz

@k8s-ci-robot
Copy link
Copy Markdown

@serathius: GitHub didn't allow me to request PR reviews from the following users: mborsz.

Note that only etcd-io members and repo collaborators can review this PR, and authors cannot review their own PRs.

Details

In response to this:

/cc @mborsz

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

Comment thread client/v3/watch.go Outdated
@serathius
Copy link
Copy Markdown
Member

/ok-to-test

@codecov
Copy link
Copy Markdown

codecov Bot commented May 5, 2026

Codecov Report

❌ Patch coverage is 69.23077% with 16 lines in your changes missing coverage. Please review.
✅ Project coverage is 70.26%. Comparing base (a3ad218) to head (0bb9da1).
⚠️ Report is 26 commits behind head on main.

Files with missing lines Patch % Lines
client/v3/watch.go 50.00% 12 Missing and 2 partials ⚠️
client/v3/block_logger.go 91.66% 1 Missing and 1 partial ⚠️
Additional details and impacted files
Files with missing lines Coverage Δ
client/v3/block_logger.go 91.66% <91.66%> (ø)
client/v3/watch.go 92.23% <50.00%> (-1.97%) ⬇️

... and 45 files with indirect coverage changes

@@            Coverage Diff             @@
##             main   #21704      +/-   ##
==========================================
+ Coverage   70.22%   70.26%   +0.03%     
==========================================
  Files         426      427       +1     
  Lines       35211    35280      +69     
==========================================
+ Hits        24727    24789      +62     
- Misses       9090     9097       +7     
  Partials     1394     1394              

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a3ad218...0bb9da1. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@xigang xigang force-pushed the watchstream_buf branch from 2bc61ec to e2661b1 Compare May 5, 2026 11:01
@k8s-ci-robot k8s-ci-robot added size/L and removed size/M labels May 5, 2026
@xigang
Copy link
Copy Markdown
Author

xigang commented May 6, 2026

@richabanker PTAL. Thanks.

Comment thread client/v3/watch.go Outdated
@xigang xigang force-pushed the watchstream_buf branch 3 times, most recently from 768cfb5 to be24a63 Compare May 12, 2026 06:48
@serathius
Copy link
Copy Markdown
Member

/retest

@serathius
Copy link
Copy Markdown
Member

@mborsz could you take a look if this implementation matches same requirements as you proposed in kubernetes/kubernetes#138126 ?

@xigang xigang force-pushed the watchstream_buf branch from be24a63 to 53df4fb Compare May 16, 2026 03:00
@xigang
Copy link
Copy Markdown
Author

xigang commented May 16, 2026

/test pull-etcd-e2e-386

Comment thread client/v3/watch.go Outdated
Comment thread client/v3/watch.go Outdated
zap.Int("buffered-responses", len(ws.buf)),
zap.Int("response-count", responseCount),
zap.Duration("time-waiting", timeWaiting),
zap.Duration("window", window),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What window?

Copy link
Copy Markdown
Author

@xigang xigang May 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this field to "log-interval".

In block_logger, this represents the elapsed accumulation window used for rate-limited logging. eventCount and timeWaiting are accumulated over this window, and a warning is emitted only when the window reaches the configured interval and the accumulated waiting time exceeds the threshold.

Comment thread client/v3/watch.go Outdated
zap.String("range-end", ws.initReq.end),
zap.Int("buffered-responses", len(ws.buf)),
zap.Int("response-count", responseCount),
zap.Duration("time-waiting", timeWaiting),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time waiting for what?

Copy link
Copy Markdown
Author

@xigang xigang May 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field name is too vague. It should be renamed to something like "time-blocked" to indicate the time the buffer spent blocked while waiting for the consumer to drain it.

Comment thread client/v3/watch.go Outdated
Comment thread client/v3/watch.go Outdated
case <-ws.initReq.ctx.Done():
return
case <-resumec:
ws.bufWaitStartTime = time.Time{}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not touch private fields of struct. Maybe add a reset function?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added resetBufWait so the resume path no longer resets bufWaitStartTime directly.

@serathius
Copy link
Copy Markdown
Member

serathius commented May 16, 2026

ping @richabanker @mborsz As we are planning to release etcd v3.7.0, please prioritize this review.

Signed-off-by: xigang <wangxigang2014@gmail.com>
@xigang xigang force-pushed the watchstream_buf branch from 53df4fb to 0bb9da1 Compare May 16, 2026 11:55
Comment thread client/v3/watch.go
}
select {
case outc <- *curWr:
w.recordBufWait(ws)
Copy link
Copy Markdown

@richabanker richabanker May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to record wait times only upon a successful dequeue? Or would we want visibility into forever stalled consumers as well (by capturing that in some sort of a ticker based log)?

cc @serathius ?

Comment thread client/v3/watch.go
ws.bufWaitStartTime = ws.bufLogger.now()
}

func (w *watchGRPCStream) recordBufWait(ws *watcherStream) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should ideally add a test for the new behavior in watch.go too...

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A quick test (written with the help of Gemini):

diff --git a/client/v3/watch_test.go b/client/v3/watch_test.go
index ca0b86d26..8b6bbe756 100644
--- a/client/v3/watch_test.go
+++ b/client/v3/watch_test.go
@@ -16,10 +16,14 @@ package clientv3
 
 import (
        "context"
+       "sync"
        "testing"
+       "time"
 
+       "go.uber.org/zap"
        "google.golang.org/grpc/metadata"
 
+       pb "go.etcd.io/etcd/api/v3/etcdserverpb"
        "go.etcd.io/etcd/api/v3/mvccpb"
 )
 
@@ -106,3 +110,108 @@ func TestStreamKeyFromCtx(t *testing.T) {
                })
        }
 }
+
+func TestServeSubstreamLogsSlowConsumer(t *testing.T) {
+       const (
+               interval   = 5 * time.Second
+               threshold = 100 * time.Millisecond
+       )
+
+       ctx, cancel := context.WithCancel(t.Context())
+       defer cancel()
+
+       // nowMu protects the virtual clock time `now` against concurrent access between
+       // the test runner thread and the background substream thread.
+       var nowMu sync.Mutex
+       now := time.Unix(0, 0)
+       startedWaiting := make(chan struct{})
+       nowCalls := 0
+
+       // logMu protects logCalls against concurrent access during assertions.
+       var logMu sync.Mutex
+       logCalls := 0
+       logged := make(chan struct{}, 1)
+
+       // Set up watcherStream with a mock blockLogger utilizing virtual time and custom log assertion callback.
+       ws := &watcherStream{
+               initReq: watchRequest{ctx: ctx},
+               outc:    make(chan WatchResponse),
+               recvc:   make(chan *WatchResponse, 1),
+               donec:   make(chan struct{}),
+       }
+       ws.bufLogger = newBlockLogger(interval, threshold, func() time.Time {
+               nowCalls++
+               // Call 1: newBlockLogger (sets lastLogTime to Unix(0,0))
+               // Call 2: startBufWait (sets bufWaitStartTime to Unix(0,0))
+               // Closing startedWaiting when nowCalls reaches 2 signals that serveSubstream
+               // has received the event and recorded its initial buffering wait start time.
+               if nowCalls == 2 {
+                       close(startedWaiting)
+               }
+               nowMu.Lock()
+               defer nowMu.Unlock()
+               return now
+       }, func(eventCount int, timeWaiting time.Duration, window time.Duration) {
+               logMu.Lock()
+               defer logMu.Unlock()
+               logCalls++
+               select {
+               case logged <- struct{}{}:
+               default:
+               }
+       })
+
+       w := &watchGRPCStream{
+               ctx:      t.Context(),
+               closingc: make(chan *watcherStream, 1),
+               lg:       zap.NewNop(), // Use Nop logger since we intercept warning logs via the callback
+       }
+       w.wg.Add(1)
+       // Start the serveSubstream event loop in the background.
+       go w.serveSubstream(ws, make(chan struct{}))
+
+       // Send a watch response to recvc. It will be received by serveSubstream,
+       // appended to the event buffer, and trigger startBufWait.
+       ws.recvc <- &WatchResponse{Header: &pb.ResponseHeader{Revision: 1}}
+
+       // Wait for serveSubstream to process the event, buffer it, and record the wait start time.
+       select {
+       case <-startedWaiting:
+       case <-time.After(10 * time.Second):
+               t.Fatal("timed out waiting for serveSubstream to start buffering")
+       }
+
+       // Advance virtual time past the threshold and interval to simulate a slow consumer.
+       nowMu.Lock()
+       now = now.Add(threshold + interval)
+       nowMu.Unlock()
+
+       // Consume from outc. This unblocks the outc select branch in serveSubstream, triggering recordBufWait.
+       select {
+       case <-ws.outc:
+       case <-time.After(time.Second):
+               t.Fatal("timed out waiting for buffered response delivery")
+       }
+
+       // Verify the backlog warning log callback was triggered.
+       select {
+       case <-logged:
+       case <-time.After(time.Second):
+               t.Fatal("timed out waiting for backlog warning callback")
+       }
+
+       logMu.Lock()
+       if logCalls != 1 {
+               logMu.Unlock()
+               t.Fatalf("expected one backlog warning, got %d", logCalls)
+       }
+       logMu.Unlock()
+
+       // Cleanup context and ensure the serveSubstream goroutine terminates cleanly.
+       cancel()
+       select {
+       case <-ws.donec:
+       case <-time.After(time.Second):
+               t.Fatal("timed out waiting for serveSubstream shutdown")
+       }
+}

@richabanker
Copy link
Copy Markdown

/lgtm

with one question about the cases we want to capture logs for, and one suggestion for adding a unit test in watch_test.go

Thanks!

@k8s-ci-robot
Copy link
Copy Markdown

@richabanker: changing LGTM is restricted to collaborators

Details

In response to this:

/lgtm

with one question about the cases we want to capture logs for, and one suggestion for adding a unit test in watch_test.go

Thanks!

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

watcherStream.buf in apiserver can grow indefinitely

4 participants