Skip to content

Commit

Permalink
Add removewatch command
Browse files Browse the repository at this point in the history
  • Loading branch information
zeikar authored and jeffbean committed Apr 13, 2024
1 parent 2480e3d commit eb16181
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ zookeeper-*/
zookeeper-*.tar.gz
apache-zookeeper-*/
apache-zookeeper-*.tar.gz
.idea/
*.iml
26 changes: 26 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,32 @@ func (c *Conn) SetACL(path string, acl []ACL, version int32) (*Stat, error) {
return &res.Stat, err
}

// RemoveWatch remove all watches on znode
func (c *Conn) RemoveWatch(path string) error {
_, err := c.request(opRemoveWatches, &removeWatchRequest{Path: path, WatcherType: watcherTypeAny}, &removeWatchResponse{}, nil)
if err != nil {
return err
}

wTypes := []watchType{watchTypeExist, watchTypeData, watchTypeChild}

c.watchersLock.Lock()
defer c.watchersLock.Unlock()
for _, t := range wTypes {
wpt := watchPathType{path, t}
ev := Event{Type: EventNotWatching, State: StateDisconnected, Path: path, Err: err}

if watchers := c.watchers[wpt]; len(watchers) > 0 {
for _, ch := range watchers {
ch <- ev
close(ch)
}
delete(c.watchers, wpt)
}
}
return nil
}

// Sync flushes the channel between process and the leader of a given znode,
// you may need it if you want identical views of ZooKeeper data for 2 client instances.
// Please refer to the "Consistency Guarantees" section of ZK document for more details.
Expand Down
15 changes: 15 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
opCheck = 13
opMulti = 14
opReconfig = 16
opRemoveWatches = 18
opCreateContainer = 19
opCreateTTL = 21
opClose = -11
Expand Down Expand Up @@ -129,6 +130,7 @@ var (
ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored")
ErrReconfigDisabled = errors.New("attempts to perform a reconfiguration operation when reconfiguration feature is disabled")
ErrBadArguments = errors.New("invalid arguments")
ErrNoWatcher = errors.New("zk: no watchers found")
// ErrInvalidCallback = errors.New("zk: invalid callback specified")

errCodeToError = map[ErrCode]error{
Expand All @@ -149,6 +151,7 @@ var (
errSessionMoved: ErrSessionMoved,
errZReconfigDisabled: ErrReconfigDisabled,
errBadArguments: ErrBadArguments,
errNoWatcher: ErrNoWatcher,
}
)

Expand Down Expand Up @@ -186,6 +189,7 @@ const (
errClosing ErrCode = -116
errNothing ErrCode = -117
errSessionMoved ErrCode = -118
errNoWatcher ErrCode = -121
// Attempts to perform a reconfiguration operation when reconfiguration feature is disabled
errZReconfigDisabled ErrCode = -123
)
Expand Down Expand Up @@ -221,6 +225,7 @@ var (
opCheck: "check",
opMulti: "multi",
opReconfig: "reconfig",
opRemoveWatches: "removeWatches",
opClose: "close",
opSetAuth: "setAuth",
opSetWatches: "setWatches",
Expand Down Expand Up @@ -263,3 +268,13 @@ var (
ModeStandalone: "standalone",
}
)

// RemoveWatcher
// https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java#L191
type WatcherType int32

const (
watcherTypeChildren WatcherType = iota + 1
watcherTypeData
watcherTypeAny
)
8 changes: 8 additions & 0 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ type multiResponse struct {
DoneHeader multiHeader
}

type removeWatchRequest struct {
Path string
WatcherType WatcherType
}
type removeWatchResponse struct{}

// zk version 3.5 reconfig API
type reconfigRequest struct {
JoiningServers []byte
Expand Down Expand Up @@ -634,6 +640,8 @@ func requestStructForOp(op int32) interface{} {
return &multiRequest{}
case opReconfig:
return &reconfigRequest{}
case opRemoveWatches:
return &removeWatchRequest{}
}
return nil
}
49 changes: 49 additions & 0 deletions zk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,55 @@ func TestIntegration_MaxBufferSize(t *testing.T) {
}
}

func TestRemoveWatch(t *testing.T) {
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()

path := "/remove-watch-test"

if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}

ex, stat, ch, err := zk.ExistsW(path)
if err != nil {
t.Fatalf("ExistW returned error: %+v", err)
} else if !ex {
t.Fatal("ExistW returned not exist")
} else if stat == nil {
t.Fatal("ExistW returned nil stat")
}

go func() {
data := <-ch
if data.Type != EventNotWatching {
t.Error("Unexpected watch state", data)
return
}
t.Log("Watch channel data", data)
}()

if err := zk.RemoveWatch(path); err != nil {
t.Fatalf("RemoveWatch returned error: %+v", err)
}
if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
}

func startSlowProxy(t *testing.T, up, down Rate, upstream string, adj func(ln *Listener)) (string, chan bool, error) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
Expand Down

0 comments on commit eb16181

Please sign in to comment.