Skip to content

Commit 1e72db0

Browse files
Merge pull request #68065 from wojtek-t/fix_unnecessary_too_old_rv_errors
Automatic merge from submit-queue (batch tested with PRs 68051, 68130, 67211, 68065, 68117). If you want to cherry-pick this change to another branch, please follow the instructions here: https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md. Fix unnecessary too-old-errors from watch cache When initializing watch cache via LIST, we set its resource version to the RV of the list request. However, before this PR, the first incoming watch event (updating the watch cache) was moving the "smallest oldest known version" to RV of that watch event. So watch requests passing rv equal to the RV returned from the initial list were failing with "too old resource version". That is not needed, because we know that in the meantime there weren't any other watch events. This PR is addressing that issue. /assign @liggitt Kubernetes-commit: 3966b8bbcc072cb48508d151c284241a26c45ede
2 parents ef442a7 + 4c39e74 commit 1e72db0

File tree

2 files changed

+69
-14
lines changed

2 files changed

+69
-14
lines changed

pkg/storage/cacher/watch_cache.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ type watchCache struct {
127127
// ResourceVersion up to which the watchCache is propagated.
128128
resourceVersion uint64
129129

130+
// ResourceVersion of the last list result (populated via Replace() method).
131+
listResourceVersion uint64
132+
130133
// This handler is run at the end of every successful Replace() method.
131134
onReplace func()
132135

@@ -147,16 +150,17 @@ func newWatchCache(
147150
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error),
148151
versioner storage.Versioner) *watchCache {
149152
wc := &watchCache{
150-
capacity: capacity,
151-
keyFunc: keyFunc,
152-
getAttrsFunc: getAttrsFunc,
153-
cache: make([]watchCacheElement, capacity),
154-
startIndex: 0,
155-
endIndex: 0,
156-
store: cache.NewStore(storeElementKey),
157-
resourceVersion: 0,
158-
clock: clock.RealClock{},
159-
versioner: versioner,
153+
capacity: capacity,
154+
keyFunc: keyFunc,
155+
getAttrsFunc: getAttrsFunc,
156+
cache: make([]watchCacheElement, capacity),
157+
startIndex: 0,
158+
endIndex: 0,
159+
store: cache.NewStore(storeElementKey),
160+
resourceVersion: 0,
161+
listResourceVersion: 0,
162+
clock: clock.RealClock{},
163+
versioner: versioner,
160164
}
161165
wc.cond = sync.NewCond(wc.RLocker())
162166
return wc
@@ -390,6 +394,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
390394
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
391395
return err
392396
}
397+
w.listResourceVersion = version
393398
w.resourceVersion = version
394399
if w.onReplace != nil {
395400
w.onReplace()
@@ -412,12 +417,26 @@ func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
412417

413418
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
414419
size := w.endIndex - w.startIndex
415-
// if we have no watch events in our cache, the oldest one we can successfully deliver to a watcher
416-
// is the *next* event we'll receive, which will be at least one greater than our current resourceVersion
417-
oldest := w.resourceVersion + 1
418-
if size > 0 {
420+
var oldest uint64
421+
switch {
422+
case size >= w.capacity:
423+
// Once the watch event buffer is full, the oldest watch event we can deliver
424+
// is the first one in the buffer.
425+
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
426+
case w.listResourceVersion > 0:
427+
// If the watch event buffer isn't full, the oldest watch event we can deliver
428+
// is one greater than the resource version of the last full list.
429+
oldest = w.listResourceVersion + 1
430+
case size > 0:
431+
// If we've never completed a list, use the resourceVersion of the oldest event
432+
// in the buffer.
433+
// This should only happen in unit tests that populate the buffer without
434+
// performing list/replace operations.
419435
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
436+
default:
437+
return nil, fmt.Errorf("watch cache isn't correctly initialized")
420438
}
439+
421440
if resourceVersion == 0 {
422441
// resourceVersion = 0 means that we don't require any specific starting point
423442
// and we would like to start watching from ~now.

pkg/storage/cacher/watch_cache_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package cacher
1919
import (
2020
"fmt"
2121
"strconv"
22+
"strings"
2223
"testing"
2324
"time"
2425

@@ -278,6 +279,41 @@ func TestEvents(t *testing.T) {
278279
}
279280
}
280281

282+
func TestMarker(t *testing.T) {
283+
store := newTestWatchCache(3)
284+
285+
// First thing that is called when propagated from storage is Replace.
286+
store.Replace([]interface{}{
287+
makeTestPod("pod1", 5),
288+
makeTestPod("pod2", 9),
289+
}, "9")
290+
291+
_, err := store.GetAllEventsSince(8)
292+
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
293+
t.Errorf("unexpected error: %v", err)
294+
}
295+
// Getting events from 8 should return no events,
296+
// even though there is a marker there.
297+
result, err := store.GetAllEventsSince(9)
298+
if err != nil {
299+
t.Fatalf("unexpected error: %v", err)
300+
}
301+
if len(result) != 0 {
302+
t.Errorf("unexpected result: %#v, expected no events", result)
303+
}
304+
305+
pod := makeTestPod("pods", 12)
306+
store.Add(pod)
307+
// Getting events from 8 should still work and return one event.
308+
result, err = store.GetAllEventsSince(9)
309+
if err != nil {
310+
t.Fatalf("unexpected error: %v", err)
311+
}
312+
if len(result) != 1 || !apiequality.Semantic.DeepEqual(result[0].Object, pod) {
313+
t.Errorf("unexpected result: %#v, expected %v", result, pod)
314+
}
315+
}
316+
281317
func TestWaitUntilFreshAndList(t *testing.T) {
282318
store := newTestWatchCache(3)
283319

0 commit comments

Comments
 (0)