Skip to content

Commit

Permalink
storage: add watch ID to identify watchings
Browse files Browse the repository at this point in the history
One watcher includes multiple watchings, and their events are
sent out through one channel. For the received event, user would like to
know which watching it belongs to.

Introduce a watch ID. When watching on some key, user will get a watch
ID. The watch ID is attached to all events that is observed by this
watch.
  • Loading branch information
yichengq committed Nov 21, 2015
1 parent 2de9a5b commit deb1da5
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 16 deletions.
11 changes: 9 additions & 2 deletions storage/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func TestWatchableKVWatch(t *testing.T) {

w := s.NewWatcher()

cancel := w.Watch([]byte("foo"), true, 0)
wid, cancel := w.Watch([]byte("foo"), true, 0)
defer cancel()

s.Put([]byte("foo"), []byte("bar"))
Expand All @@ -750,6 +750,7 @@ func TestWatchableKVWatch(t *testing.T) {
ModRevision: 1,
Version: 1,
},
WatchID: wid,
}
if !reflect.DeepEqual(ev, wev) {
t.Errorf("watched event = %+v, want %+v", ev, wev)
Expand All @@ -770,6 +771,7 @@ func TestWatchableKVWatch(t *testing.T) {
ModRevision: 2,
Version: 1,
},
WatchID: wid,
}
if !reflect.DeepEqual(ev, wev) {
t.Errorf("watched event = %+v, want %+v", ev, wev)
Expand All @@ -778,7 +780,10 @@ func TestWatchableKVWatch(t *testing.T) {
t.Fatalf("failed to watch the event")
}

cancel = w.Watch([]byte("foo1"), false, 1)
w.Close()

w = s.NewWatcher()
wid, cancel = w.Watch([]byte("foo1"), false, 1)
defer cancel()

select {
Expand All @@ -792,6 +797,7 @@ func TestWatchableKVWatch(t *testing.T) {
ModRevision: 2,
Version: 1,
},
WatchID: wid,
}
if !reflect.DeepEqual(ev, wev) {
t.Errorf("watched event = %+v, want %+v", ev, wev)
Expand All @@ -812,6 +818,7 @@ func TestWatchableKVWatch(t *testing.T) {
ModRevision: 3,
Version: 2,
},
WatchID: wid,
}
if !reflect.DeepEqual(ev, wev) {
t.Errorf("watched event = %+v, want %+v", ev, wev)
Expand Down
27 changes: 26 additions & 1 deletion storage/storagepb/kv.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion storage/storagepb/kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ message Event {
// a delete/expire event contains the previous
// key-value
KeyValue kv = 2;
// watchID is the ID of watching this event is sent to.
int64 watchID = 3;
}

12 changes: 8 additions & 4 deletions storage/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
)

type watchable interface {
watch(key []byte, prefix bool, startRev int64, ch chan<- storagepb.Event) (*watching, CancelFunc)
watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc)
}

type watchableStore struct {
Expand Down Expand Up @@ -173,14 +173,15 @@ func (s *watchableStore) NewWatcher() Watcher {
}
}

func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<- storagepb.Event) (*watching, CancelFunc) {
func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc) {
s.mu.Lock()
defer s.mu.Unlock()

wa := &watching{
key: key,
prefix: prefix,
cur: startRev,
id: id,
ch: ch,
}

Expand Down Expand Up @@ -273,8 +274,9 @@ func (s *watchableStore) syncWatchings() {
}

w.ch <- storagepb.Event{
Type: evt,
Kv: &kv,
Type: evt,
Kv: &kv,
WatchID: w.id,
}
pendingEventsGauge.Inc()
}
Expand Down Expand Up @@ -311,6 +313,7 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
if !w.prefix && i != len(ev.Kv.Key) {
continue
}
ev.WatchID = w.id
select {
case w.ch <- ev:
pendingEventsGauge.Inc()
Expand Down Expand Up @@ -362,6 +365,7 @@ type watching struct {
// If cur is behind the current revision of the KV,
// watching is unsynced and needs to catch up.
cur int64
id int64

// a chan to send out the watched events.
// The chan might be shared with other watchings.
Expand Down
4 changes: 2 additions & 2 deletions storage/watchable_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
cancels := make([]CancelFunc, watcherSize)
for i := 0; i < watcherSize; i++ {
// non-0 value to keep watchers in unsynced
cancel := w.Watch(testKey, true, 1)
_, cancel := w.Watch(testKey, true, 1)
cancels[i] = cancel
}

Expand Down Expand Up @@ -102,7 +102,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
cancels := make([]CancelFunc, watcherSize)
for i := 0; i < watcherSize; i++ {
// 0 for startRev to keep watchers in synced
cancel := w.Watch(testKey, true, 0)
_, cancel := w.Watch(testKey, true, 0)
cancels[i] = cancel
}

Expand Down
2 changes: 1 addition & 1 deletion storage/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestNewWatcherCancel(t *testing.T) {
s.Put(testKey, testValue)

w := s.NewWatcher()
cancel := w.Watch(testKey, true, 0)
_, cancel := w.Watch(testKey, true, 0)

cancel()

Expand Down
18 changes: 13 additions & 5 deletions storage/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type Watcher interface {
// The whole event history can be watched unless compacted.
// If `prefix` is true, watch observes all events whose key prefix could be the given `key`.
// If `startRev` <=0, watch observes events after currentRev.
Watch(key []byte, prefix bool, startRev int64) CancelFunc
// The returned `id` is the ID of this watching. It appears as WatchID
// in events that are sent to this watching.
Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc)

// Chan returns a chan. All watched events will be sent to the returned chan.
Chan() <-chan storagepb.Event
Expand All @@ -42,21 +44,27 @@ type watcher struct {
ch chan storagepb.Event

mu sync.Mutex // guards fields below it
nextID int64 // nextID is the ID allocated for next new watching
closed bool
cancels []CancelFunc
}

// TODO: return error if ws is closed?
func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) CancelFunc {
_, c := ws.watchable.watch(key, prefix, startRev, ws.ch)
func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) {
ws.mu.Lock()
defer ws.mu.Unlock()
if ws.closed {
return nil
return -1, nil
}

id = ws.nextID
ws.nextID++

_, c := ws.watchable.watch(key, prefix, startRev, id, ws.ch)

// TODO: cancelFunc needs to be removed from the cancels when it is called.
ws.cancels = append(ws.cancels, c)
return c
return id, c
}

func (ws *watcher) Chan() <-chan storagepb.Event {
Expand Down
64 changes: 64 additions & 0 deletions storage/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import "testing"

// TestWatcherWatchID tests that each watcher provides unique watch ID,
// and the watched event attaches the correct watch ID.
func TestWatcherWatchID(t *testing.T) {
s := WatchableKV(newWatchableStore(tmpPath))
defer cleanup(s, tmpPath)

w := s.NewWatcher()
defer w.Close()

idm := make(map[int64]struct{})

// synced watchings
for i := 0; i < 10; i++ {
id, cancel := w.Watch([]byte("foo"), false, 0)
if _, ok := idm[id]; ok {
t.Errorf("#%d: id %d exists", i, id)
}
idm[id] = struct{}{}

s.Put([]byte("foo"), []byte("bar"))

ev := <-w.Chan()
if ev.WatchID != id {
t.Errorf("#%d: watch id in event = %d, want %d", i, ev.WatchID, id)
}

cancel()
}

s.Put([]byte("foo2"), []byte("bar"))
// unsynced watchings
for i := 10; i < 20; i++ {
id, cancel := w.Watch([]byte("foo2"), false, 1)
if _, ok := idm[id]; ok {
t.Errorf("#%d: id %d exists", i, id)
}
idm[id] = struct{}{}

ev := <-w.Chan()
if ev.WatchID != id {
t.Errorf("#%d: watch id in event = %d, want %d", i, ev.WatchID, id)
}

cancel()
}
}

0 comments on commit deb1da5

Please sign in to comment.