Skip to content

Commit 4c39e74

Browse files
wojtek-tk8s-publishing-bot
authored andcommitted
Fix unnecessary too-old-errors from watch cache
Kubernetes-commit: 12021725922efc3a80c8a0673b28826a524eb0a0
1 parent ef442a7 commit 4c39e74

File tree

2 files changed

+69
-14
lines changed

2 files changed

+69
-14
lines changed

pkg/storage/cacher/watch_cache.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ type watchCache struct {
127127
// ResourceVersion up to which the watchCache is propagated.
128128
resourceVersion uint64
129129

130+
// ResourceVersion of the last list result (populated via Replace() method).
131+
listResourceVersion uint64
132+
130133
// This handler is run at the end of every successful Replace() method.
131134
onReplace func()
132135

@@ -147,16 +150,17 @@ func newWatchCache(
147150
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error),
148151
versioner storage.Versioner) *watchCache {
149152
wc := &watchCache{
150-
capacity: capacity,
151-
keyFunc: keyFunc,
152-
getAttrsFunc: getAttrsFunc,
153-
cache: make([]watchCacheElement, capacity),
154-
startIndex: 0,
155-
endIndex: 0,
156-
store: cache.NewStore(storeElementKey),
157-
resourceVersion: 0,
158-
clock: clock.RealClock{},
159-
versioner: versioner,
153+
capacity: capacity,
154+
keyFunc: keyFunc,
155+
getAttrsFunc: getAttrsFunc,
156+
cache: make([]watchCacheElement, capacity),
157+
startIndex: 0,
158+
endIndex: 0,
159+
store: cache.NewStore(storeElementKey),
160+
resourceVersion: 0,
161+
listResourceVersion: 0,
162+
clock: clock.RealClock{},
163+
versioner: versioner,
160164
}
161165
wc.cond = sync.NewCond(wc.RLocker())
162166
return wc
@@ -390,6 +394,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
390394
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
391395
return err
392396
}
397+
w.listResourceVersion = version
393398
w.resourceVersion = version
394399
if w.onReplace != nil {
395400
w.onReplace()
@@ -412,12 +417,26 @@ func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
412417

413418
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
414419
size := w.endIndex - w.startIndex
415-
// if we have no watch events in our cache, the oldest one we can successfully deliver to a watcher
416-
// is the *next* event we'll receive, which will be at least one greater than our current resourceVersion
417-
oldest := w.resourceVersion + 1
418-
if size > 0 {
420+
var oldest uint64
421+
switch {
422+
case size >= w.capacity:
423+
// Once the watch event buffer is full, the oldest watch event we can deliver
424+
// is the first one in the buffer.
425+
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
426+
case w.listResourceVersion > 0:
427+
// If the watch event buffer isn't full, the oldest watch event we can deliver
428+
// is one greater than the resource version of the last full list.
429+
oldest = w.listResourceVersion + 1
430+
case size > 0:
431+
// If we've never completed a list, use the resourceVersion of the oldest event
432+
// in the buffer.
433+
// This should only happen in unit tests that populate the buffer without
434+
// performing list/replace operations.
419435
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
436+
default:
437+
return nil, fmt.Errorf("watch cache isn't correctly initialized")
420438
}
439+
421440
if resourceVersion == 0 {
422441
// resourceVersion = 0 means that we don't require any specific starting point
423442
// and we would like to start watching from ~now.

pkg/storage/cacher/watch_cache_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package cacher
1919
import (
2020
"fmt"
2121
"strconv"
22+
"strings"
2223
"testing"
2324
"time"
2425

@@ -278,6 +279,41 @@ func TestEvents(t *testing.T) {
278279
}
279280
}
280281

282+
func TestMarker(t *testing.T) {
283+
store := newTestWatchCache(3)
284+
285+
// First thing that is called when propagated from storage is Replace.
286+
store.Replace([]interface{}{
287+
makeTestPod("pod1", 5),
288+
makeTestPod("pod2", 9),
289+
}, "9")
290+
291+
_, err := store.GetAllEventsSince(8)
292+
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
293+
t.Errorf("unexpected error: %v", err)
294+
}
295+
// Getting events from 8 should return no events,
296+
// even though there is a marker there.
297+
result, err := store.GetAllEventsSince(9)
298+
if err != nil {
299+
t.Fatalf("unexpected error: %v", err)
300+
}
301+
if len(result) != 0 {
302+
t.Errorf("unexpected result: %#v, expected no events", result)
303+
}
304+
305+
pod := makeTestPod("pods", 12)
306+
store.Add(pod)
307+
// Getting events from 8 should still work and return one event.
308+
result, err = store.GetAllEventsSince(9)
309+
if err != nil {
310+
t.Fatalf("unexpected error: %v", err)
311+
}
312+
if len(result) != 1 || !apiequality.Semantic.DeepEqual(result[0].Object, pod) {
313+
t.Errorf("unexpected result: %#v, expected %v", result, pod)
314+
}
315+
}
316+
281317
func TestWaitUntilFreshAndList(t *testing.T) {
282318
store := newTestWatchCache(3)
283319

0 commit comments

Comments
 (0)