Skip to content

Commit

Permalink
Corrected the logic for watches. Must simpler now
Browse files Browse the repository at this point in the history
  • Loading branch information
samuel committed May 30, 2013
1 parent 1eab530 commit 00fe8c3
Showing 1 changed file with 59 additions and 86 deletions.
145 changes: 59 additions & 86 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ type request struct {
pkt interface{}
recvStruct interface{}
recvChan chan response

// Because sending and receiving happen in separate go routines, there's
// a possible race condition when creating watches from outside the read
// loop. We must ensure that a watcher gets added to the list synchronously
// with the response from the server on any request that creates a watch.
// In order to not hard code the watch logic for each opcode in the recv
// loop the caller can use recvFunc to insert some synchronously code
// after a response.
recvFunc func(*request, *responseHeader, error)
}

type response struct {
Expand Down Expand Up @@ -131,7 +140,7 @@ func (c *Conn) Close() {
close(c.shouldQuit)

select {
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}):
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
case <-time.After(time.Second):
}
}
Expand Down Expand Up @@ -276,28 +285,28 @@ func (c *Conn) sendSetWatches() {
ExistWatches: make([]string, 0),
ChildWatches: make([]string, 0),
}
pathLen := 0
n := 0
for path, watchers := range c.watchers {
if len(watchers.dataWatchers) != 0 {
req.DataWatches = append(req.DataWatches, path)
pathLen += len(path)
n++
}
if len(watchers.existWatchers) != 0 {
req.ExistWatches = append(req.ExistWatches, path)
pathLen += len(path)
n++
}
if len(watchers.childWatchers) != 0 {
req.ChildWatches = append(req.ChildWatches, path)
pathLen += len(path)
n++
}
}
if pathLen == 0 {
if n == 0 {
return
}

go func() {
res := &setWatchesResponse{}
_, err := c.request(opSetWatches, req, res)
_, err := c.request(opSetWatches, req, res, nil)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -532,6 +541,9 @@ func (c *Conn) recvLoop(conn net.Conn) error {
_, err = decodePacket(buf[16:16+blen], req.recvStruct)
}
req.recvChan <- response{res.Zxid, err}
if req.recvFunc != nil {
req.recvFunc(req, &res, err)
}
if req.opcode == opClose {
return io.EOF
}
Expand All @@ -545,7 +557,7 @@ func (c *Conn) nextXid() int32 {
return atomic.AddInt32(&c.xid, 1)
}

func (c *Conn) addWatcher(path string, watcherType watcherType, zxid int64) *watcher {
func (c *Conn) addWatcher(path string, watcherType watcherType, zxid int64) <-chan Event {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()

Expand All @@ -568,113 +580,81 @@ func (c *Conn) addWatcher(path string, watcherType watcherType, zxid int64) *wat
case watcherTypeExist:
wat.existWatchers = append(wat.existWatchers, w)
}
return w
return w.ch
}

func removeWatcherFromList(watchers []*watcher, w *watcher) ([]*watcher, bool) {
for i := 0; i < len(watchers); i++ {
if watchers[i] == w {
watchers[i] = watchers[len(watchers)-1]
return watchers[:len(watchers)-1], true
}
}
return watchers, false
}

func (c *Conn) removeWatcher(path string, watcherType watcherType, w *watcher) bool {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()

wat := c.watchers[path]
if wat == nil {
return false
}

found := false
switch watcherType {
case watcherTypeChild:
wat.childWatchers, found = removeWatcherFromList(wat.childWatchers, w)
case watcherTypeData:
wat.dataWatchers, found = removeWatcherFromList(wat.dataWatchers, w)
case watcherTypeExist:
wat.existWatchers, found = removeWatcherFromList(wat.existWatchers, w)
}
return found
}

func (c *Conn) updateWatcherZxid(w *watcher, zxid int64) {
c.watchersLock.Lock()
w.zxid = zxid
c.watchersLock.Unlock()
}

func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}) <-chan response {
func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) <-chan response {
rq := &request{
xid: c.nextXid(),
opcode: opcode,
pkt: req,
recvStruct: res,
recvChan: make(chan response, 1),
recvFunc: recvFunc,
}
c.sendChan <- rq
return rq.recvChan
}

func (c *Conn) request(opcode int32, req interface{}, res interface{}) (int64, error) {
r := <-c.queueRequest(opcode, req, res)
func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (int64, error) {
r := <-c.queueRequest(opcode, req, res, recvFunc)
return r.zxid, r.err
}

func (c *Conn) AddAuth(scheme string, auth []byte) error {
_, err := c.request(opSetAuth, &setAuthRequest{Type: 0, Scheme: scheme, Auth: auth}, &setAuthResponse{})
_, err := c.request(opSetAuth, &setAuthRequest{Type: 0, Scheme: scheme, Auth: auth}, &setAuthResponse{}, nil)
return err
}

func (c *Conn) Children(path string) ([]string, *Stat, error) {
res := &getChildren2Response{}
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: false}, res)
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: false}, res, nil)
return res.Children, &res.Stat, err
}

func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) {
w := c.addWatcher(path, watcherTypeChild, -1)
var ech <-chan Event
res := &getChildren2Response{}
zxid, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res)
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watcherTypeChild, res.Zxid)
}
})
if err != nil {
c.removeWatcher(path, watcherTypeChild, w)
return nil, nil, nil, err
}
c.updateWatcherZxid(w, zxid)
return res.Children, &res.Stat, w.ch, err
return res.Children, &res.Stat, ech, err
}

func (c *Conn) Get(path string) ([]byte, *Stat, error) {
res := &getDataResponse{}
_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: false}, res)
_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: false}, res, nil)
return res.Data, &res.Stat, err
}

func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error) {
w := c.addWatcher(path, watcherTypeData, -1)
var ech <-chan Event
res := &getDataResponse{}
zxid, err := c.request(opGetData, &getDataRequest{Path: path, Watch: true}, res)
_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watcherTypeData, res.Zxid)
}
})
if err != nil {
c.removeWatcher(path, watcherTypeData, w)
return nil, nil, nil, err
}
c.updateWatcherZxid(w, zxid)
return res.Data, &res.Stat, w.ch, err
return res.Data, &res.Stat, ech, err
}

func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error) {
res := &setDataResponse{}
_, err := c.request(opSetData, &SetDataRequest{path, data, version}, res)
_, err := c.request(opSetData, &SetDataRequest{path, data, version}, res, nil)
return &res.Stat, err
}

func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error) {
res := &createResponse{}
_, err := c.request(opCreate, &CreateRequest{path, data, acl, flags}, res)
_, err := c.request(opCreate, &CreateRequest{path, data, acl, flags}, res, nil)
return res.Path, err
}

Expand Down Expand Up @@ -723,13 +703,13 @@ func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl
}

func (c *Conn) Delete(path string, version int32) error {
_, err := c.request(opDelete, &DeleteRequest{path, version}, &deleteResponse{})
_, err := c.request(opDelete, &DeleteRequest{path, version}, &deleteResponse{}, nil)
return err
}

func (c *Conn) Exists(path string) (bool, *Stat, error) {
res := &existsResponse{}
_, err := c.request(opExists, &existsRequest{Path: path, Watch: false}, res)
_, err := c.request(opExists, &existsRequest{Path: path, Watch: false}, res, nil)
exists := true
if err == ErrNoNode {
exists = false
Expand All @@ -739,48 +719,41 @@ func (c *Conn) Exists(path string) (bool, *Stat, error) {
}

func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error) {
w1 := c.addWatcher(path, watcherTypeData, -1)
w2 := c.addWatcher(path, watcherTypeExist, -1)
var ech <-chan Event
res := &existsResponse{}
zxid, err := c.request(opExists, &existsRequest{Path: path, Watch: true}, res)
_, err := c.request(opExists, &existsRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watcherTypeData, res.Zxid)
} else if err == ErrNoNode {
ech = c.addWatcher(path, watcherTypeExist, res.Zxid)
}
})
exists := true
if err == ErrNoNode {
exists = false
err = nil
}
if err != nil {
c.removeWatcher(path, watcherTypeData, w1)
c.removeWatcher(path, watcherTypeExist, w2)
return false, nil, nil, err
}
var w *watcher
if exists {
c.removeWatcher(path, watcherTypeExist, w2)
c.updateWatcherZxid(w1, zxid)
w = w1
} else {
c.removeWatcher(path, watcherTypeExist, w1)
c.updateWatcherZxid(w2, zxid)
w = w2
}
return exists, &res.Stat, w.ch, err
return exists, &res.Stat, ech, err
}

func (c *Conn) GetACL(path string) ([]ACL, *Stat, error) {
res := &getAclResponse{}
_, err := c.request(opGetAcl, &getAclRequest{Path: path}, res)
_, err := c.request(opGetAcl, &getAclRequest{Path: path}, res, nil)
return res.Acl, &res.Stat, err
}

func (c *Conn) SetACL(path string, acl []ACL, version int32) (*Stat, error) {
res := &setAclResponse{}
_, err := c.request(opSetAcl, &setAclRequest{Path: path, Acl: acl, Version: version}, res)
_, err := c.request(opSetAcl, &setAclRequest{Path: path, Acl: acl, Version: version}, res, nil)
return &res.Stat, err
}

func (c *Conn) Sync(path string) (string, error) {
res := &syncResponse{}
_, err := c.request(opSync, &syncRequest{Path: path}, res)
_, err := c.request(opSync, &syncRequest{Path: path}, res, nil)
return res.Path, err
}

Expand Down Expand Up @@ -809,6 +782,6 @@ func (c *Conn) Multi(ops MultiOps) error {
req.Ops = append(req.Ops, multiRequestOp{multiHeader{opCheck, false, -1}, r})
}
res := &multiResponse{}
_, err := c.request(opMulti, req, res)
_, err := c.request(opMulti, req, res, nil)
return err
}

0 comments on commit 00fe8c3

Please sign in to comment.