Skip to content

Commit 88805ca

Browse files
serathiusk8s-publishing-bot
authored andcommitted
Prevent watch cache starvation, by moving its watch to separate RPC and add a SeparateCacheWatchRPC feature flag to disable this behavior
Kubernetes-commit: 4009acb0cf17a5500041bd40514a62256bef69d9
1 parent bd6de43 commit 88805ca

File tree

7 files changed

+53
-16
lines changed

7 files changed

+53
-16
lines changed

pkg/features/kube_features.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ const (
163163
// Deprecates and removes SelfLink from ObjectMeta and ListMeta.
164164
RemoveSelfLink featuregate.Feature = "RemoveSelfLink"
165165

166+
// owner: @serathius
167+
// beta: v1.30
168+
//
169+
// Allow watch cache to create a watch on a dedicated RPC.
170+
// This prevents watch cache from being starved by other watches.
171+
SeparateCacheWatchRPC featuregate.Feature = "SeparateCacheWatchRPC"
172+
166173
// owner: @apelisse, @lavalamp
167174
// alpha: v1.14
168175
// beta: v1.16
@@ -303,6 +310,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
303310

304311
RemoveSelfLink: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
305312

313+
SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},
314+
306315
ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
307316

308317
ServerSideFieldValidation: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29

pkg/storage/cacher/cacher.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"go.opentelemetry.io/otel/attribute"
28+
"google.golang.org/grpc/metadata"
2829

2930
"k8s.io/apimachinery/pkg/api/errors"
3031
"k8s.io/apimachinery/pkg/api/meta"
@@ -397,10 +398,18 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
397398
// so that future reuse does not get a spurious timeout.
398399
<-cacher.timer.C
399400
}
400-
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock)
401+
var contextMetadata metadata.MD
402+
if utilfeature.DefaultFeatureGate.Enabled(features.SeparateCacheWatchRPC) {
403+
// Add grpc context metadata to watch and progress notify requests done by cacher to:
404+
// * Prevent starvation of watch opened by cacher, by moving it to separate Watch RPC than watch request that bypass cacher.
405+
// * Ensure that progress notification requests are executed on the same Watch RPC as their watch, which is required for it to work.
406+
contextMetadata = metadata.New(map[string]string{"source": "cache"})
407+
}
408+
409+
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
401410
watchCache := newWatchCache(
402411
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
403-
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
412+
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, contextMetadata)
404413
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
405414

406415
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)

pkg/storage/cacher/lister_watcher.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package cacher
1919
import (
2020
"context"
2121

22+
"google.golang.org/grpc/metadata"
23+
2224
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2325
"k8s.io/apimachinery/pkg/fields"
2426
"k8s.io/apimachinery/pkg/labels"
@@ -30,17 +32,19 @@ import (
3032

3133
// listerWatcher opaques storage.Interface to expose cache.ListerWatcher.
3234
type listerWatcher struct {
33-
storage storage.Interface
34-
resourcePrefix string
35-
newListFunc func() runtime.Object
35+
storage storage.Interface
36+
resourcePrefix string
37+
newListFunc func() runtime.Object
38+
contextMetadata metadata.MD
3639
}
3740

3841
// NewListerWatcher returns a storage.Interface backed ListerWatcher.
39-
func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
42+
func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object, contextMetadata metadata.MD) cache.ListerWatcher {
4043
return &listerWatcher{
41-
storage: storage,
42-
resourcePrefix: resourcePrefix,
43-
newListFunc: newListFunc,
44+
storage: storage,
45+
resourcePrefix: resourcePrefix,
46+
newListFunc: newListFunc,
47+
contextMetadata: contextMetadata,
4448
}
4549
}
4650

@@ -59,7 +63,11 @@ func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error
5963
Predicate: pred,
6064
Recursive: true,
6165
}
62-
if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil {
66+
ctx := context.Background()
67+
if lw.contextMetadata != nil {
68+
ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata)
69+
}
70+
if err := lw.storage.GetList(ctx, lw.resourcePrefix, storageOpts, list); err != nil {
6371
return nil, err
6472
}
6573
return list, nil
@@ -73,5 +81,9 @@ func (lw *listerWatcher) Watch(options metav1.ListOptions) (watch.Interface, err
7381
Recursive: true,
7482
ProgressNotify: true,
7583
}
76-
return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts)
84+
ctx := context.Background()
85+
if lw.contextMetadata != nil {
86+
ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata)
87+
}
88+
return lw.storage.Watch(ctx, lw.resourcePrefix, opts)
7789
}

pkg/storage/cacher/lister_watcher_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestCacherListerWatcher(t *testing.T) {
4444
}
4545
}
4646

47-
lw := NewListerWatcher(store, prefix, fn)
47+
lw := NewListerWatcher(store, prefix, fn, nil)
4848

4949
obj, err := lw.List(metav1.ListOptions{})
5050
if err != nil {
@@ -80,7 +80,7 @@ func TestCacherListerWatcherPagination(t *testing.T) {
8080
}
8181
}
8282

83-
lw := NewListerWatcher(store, prefix, fn)
83+
lw := NewListerWatcher(store, prefix, fn, nil)
8484

8585
obj1, err := lw.List(metav1.ListOptions{Limit: 2})
8686
if err != nil {

pkg/storage/cacher/watch_cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
122122
wc := &testWatchCache{}
123123
wc.bookmarkRevision = make(chan int64, 1)
124124
wc.stopCh = make(chan struct{})
125-
pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{})
125+
pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil)
126126
go pr.Run(wc.stopCh)
127127
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}, pr)
128128
// To preserve behavior of tests that assume a given capacity,

pkg/storage/cacher/watch_progress.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"sync"
2222
"time"
2323

24+
"google.golang.org/grpc/metadata"
25+
2426
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2527
"k8s.io/apimachinery/pkg/util/wait"
2628

@@ -34,10 +36,11 @@ const (
3436
progressRequestPeriod = 100 * time.Millisecond
3537
)
3638

37-
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory) *conditionalProgressRequester {
39+
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *conditionalProgressRequester {
3840
pr := &conditionalProgressRequester{
3941
clock: clock,
4042
requestWatchProgress: requestWatchProgress,
43+
contextMetadata: contextMetadata,
4144
}
4245
pr.cond = sync.NewCond(pr.mux.RLocker())
4346
return pr
@@ -54,6 +57,7 @@ type TickerFactory interface {
5457
type conditionalProgressRequester struct {
5558
clock TickerFactory
5659
requestWatchProgress WatchProgressRequester
60+
contextMetadata metadata.MD
5761

5862
mux sync.RWMutex
5963
cond *sync.Cond
@@ -63,6 +67,9 @@ type conditionalProgressRequester struct {
6367

6468
func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
6569
ctx := wait.ContextForChannel(stopCh)
70+
if pr.contextMetadata != nil {
71+
ctx = metadata.NewOutgoingContext(ctx, pr.contextMetadata)
72+
}
6673
go func() {
6774
defer utilruntime.HandleCrash()
6875
<-stopCh

pkg/storage/cacher/watch_progress_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func TestConditionalProgressRequester(t *testing.T) {
115115

116116
func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester {
117117
pr := &testConditionalProgressRequester{}
118-
pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock)
118+
pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock, nil)
119119
return pr
120120
}
121121

0 commit comments

Comments
 (0)