Skip to content

Commit 7a148fe

Browse files
authored
Merge pull request #9401 from jpbetz/automated-cherry-pick-of-#9297-origin-release-3.1
Automated cherry pick of #9297
2 parents 4178b75 + 087b9aa commit 7a148fe

File tree

2 files changed

+38
-29
lines changed

2 files changed

+38
-29
lines changed

mvcc/watchable_store.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
267267
}
268268

269269
for wa := range s.synced.watchers {
270-
s.unsynced.watchers.add(wa)
270+
s.unsynced.add(wa)
271271
}
272272
s.synced = newWatcherGroup()
273273
return nil

mvcc/watchable_store_test.go

+37-28
Original file line numberDiff line numberDiff line change
@@ -295,36 +295,45 @@ func TestWatchFutureRev(t *testing.T) {
295295
}
296296

297297
func TestWatchRestore(t *testing.T) {
298-
b, tmpPath := backend.NewDefaultTmpBackend()
299-
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
300-
defer cleanup(s, b, tmpPath)
301-
302-
testKey := []byte("foo")
303-
testValue := []byte("bar")
304-
rev := s.Put(testKey, testValue, lease.NoLease)
305-
306-
newBackend, newPath := backend.NewDefaultTmpBackend()
307-
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
308-
defer cleanup(newStore, newBackend, newPath)
309-
310-
w := newStore.NewWatchStream()
311-
w.Watch(testKey, nil, rev-1)
312-
313-
newStore.Restore(b)
314-
select {
315-
case resp := <-w.Chan():
316-
if resp.Revision != rev {
317-
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
318-
}
319-
if len(resp.Events) != 1 {
320-
t.Fatalf("failed to get events from the response")
321-
}
322-
if resp.Events[0].Kv.ModRevision != rev {
323-
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
298+
test := func(delay time.Duration) func(t *testing.T) {
299+
return func(t *testing.T) {
300+
b, tmpPath := backend.NewDefaultTmpBackend()
301+
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
302+
defer cleanup(s, b, tmpPath)
303+
304+
testKey := []byte("foo")
305+
testValue := []byte("bar")
306+
rev := s.Put(testKey, testValue, lease.NoLease)
307+
308+
newBackend, newPath := backend.NewDefaultTmpBackend()
309+
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
310+
defer cleanup(newStore, newBackend, newPath)
311+
312+
w := newStore.NewWatchStream()
313+
w.Watch(testKey, nil, rev-1)
314+
315+
time.Sleep(delay)
316+
317+
newStore.Restore(b)
318+
select {
319+
case resp := <-w.Chan():
320+
if resp.Revision != rev {
321+
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
322+
}
323+
if len(resp.Events) != 1 {
324+
t.Fatalf("failed to get events from the response")
325+
}
326+
if resp.Events[0].Kv.ModRevision != rev {
327+
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
328+
}
329+
case <-time.After(time.Second):
330+
t.Fatal("failed to receive event in 1 second.")
331+
}
324332
}
325-
case <-time.After(time.Second):
326-
t.Fatal("failed to receive event in 1 second.")
327333
}
334+
335+
t.Run("Normal", test(0))
336+
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
328337
}
329338

330339
// TestWatchBatchUnsynced tests batching on unsynced watchers

0 commit comments

Comments
 (0)