Skip to content

Commit

Permalink
Merge pull request samuel#134 from nomis52/multi-fix
Browse files Browse the repository at this point in the history
Handle Multi() errors correctly.
  • Loading branch information
samuel authored Aug 26, 2016
2 parents 2a8f028 + 7ecf500 commit a138416
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 2 deletions.
3 changes: 2 additions & 1 deletion zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ func (c *Conn) Sync(path string) (string, error) {
type MultiResponse struct {
Stat *Stat
String string
Error error
}

// Multi executes multiple ZooKeeper operations or none of them. The provided
Expand Down Expand Up @@ -915,7 +916,7 @@ func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) {
_, err := c.request(opMulti, req, res, nil)
mr := make([]MultiResponse, len(res.Ops))
for i, op := range res.Ops {
mr[i] = MultiResponse{Stat: op.Stat, String: op.String}
mr[i] = MultiResponse{Stat: op.Stat, String: op.String, Error: op.Err.toError()}
}
return mr, err
}
Expand Down
1 change: 1 addition & 0 deletions zk/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
opClose = -11
opSetAuth = 100
opSetWatches = 101
opError = -1
// Not in protocol, used internally
opWatcherEvent = -2
)
Expand Down
11 changes: 10 additions & 1 deletion zk/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ type multiResponseOp struct {
Header multiHeader
String string
Stat *Stat
Err ErrCode
}
type multiResponse struct {
Ops []multiResponseOp
Expand Down Expand Up @@ -327,6 +328,8 @@ func (r *multiRequest) Decode(buf []byte) (int, error) {
}

func (r *multiResponse) Decode(buf []byte) (int, error) {
var multiErr error

r.Ops = make([]multiResponseOp, 0)
r.DoneHeader = multiHeader{-1, true, -1}
total := 0
Expand All @@ -347,6 +350,8 @@ func (r *multiResponse) Decode(buf []byte) (int, error) {
switch header.Type {
default:
return total, ErrAPIError
case opError:
w = reflect.ValueOf(&res.Err)
case opCreate:
w = reflect.ValueOf(&res.String)
case opSetData:
Expand All @@ -362,8 +367,12 @@ func (r *multiResponse) Decode(buf []byte) (int, error) {
total += n
}
r.Ops = append(r.Ops, res)
if multiErr == nil && res.Err != errOk {
// Use the first error as the error returned from Multi().
multiErr = res.Err.toError()
}
}
return total, nil
return total, multiErr
}

type watcherEvent struct {
Expand Down
48 changes: 48 additions & 0 deletions zk/zk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,54 @@ func TestMulti(t *testing.T) {
}
}

func TestMultiFailures(t *testing.T) {
// This test case ensures that we return the errors associated with each
// opeThis in the event a call to Multi() fails.
const firstPath = "/gozk-test-first"
const secondPath = "/gozk-test-second"

ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()

// Ensure firstPath doesn't exist and secondPath does. This will cause the
// 2nd operation in the Multi() to fail.
if err := zk.Delete(firstPath, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if _, err := zk.Create(secondPath, nil /* data */, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
}

ops := []interface{}{
&CreateRequest{Path: firstPath, Data: []byte{1, 2}, Acl: WorldACL(PermAll)},
&CreateRequest{Path: secondPath, Data: []byte{3, 4}, Acl: WorldACL(PermAll)},
}
res, err := zk.Multi(ops...)
if err != ErrNodeExists {
t.Fatalf("Multi() didn't return correct error: %+v", err)
}
if len(res) != 2 {
t.Fatalf("Expected 2 responses received %d", len(res))
}
if res[0].Error != nil {
t.Fatalf("First operation returned an unexpected error %+v", res[0].Error)
}
if res[1].Error != ErrNodeExists {
t.Fatalf("Second operation returned incorrect error %+v", res[1].Error)
}
if _, _, err := zk.Get(firstPath); err != ErrNoNode {
t.Fatalf("Node %s was incorrectly created: %+v", firstPath, err)
}
}

func TestGetSetACL(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
Expand Down

0 comments on commit a138416

Please sign in to comment.