Skip to content

Commit

Permalink
support for ttl nodes and container nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
einxie authored and pmazzini committed Jun 11, 2020
1 parent b2a9da9 commit 1ea4474
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 1 deletion.
31 changes: 31 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,33 @@ func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string,
return res.Path, err
}

func (c *Conn) CreateContainer(path string, data []byte, flags int32, acl []ACL) (string, error) {
if err := validatePath(path, flags&FlagSequence == FlagSequence); err != nil {
return "", err
}
if flags != FlagContainer {
return "", errors.New("flags not support container node")
}

res := &createResponse{}
_, err := c.request(opCreateContainer, &CreateContainerRequest{path, data, acl, flags}, res, nil)
return res.Path, err
}

// ttl: ms
func (c *Conn) CreateTTL(path string, data []byte, flags int32, acl []ACL, ttl int64) (string, error) {
if err := validatePath(path, flags&FlagSequence == FlagSequence); err != nil {
return "", err
}
if flags != FlagWithTTL && flags != FlagSequenceWithTTL {
return "", errors.New("flags not support ttl node")
}

res := &createResponse{}
_, err := c.request(opCreateTTL, &CreateTTLRequest{path, data, acl, flags, ttl}, res, nil)
return res.Path, err
}

// CreateProtectedEphemeralSequential fixes a race condition if the server crashes
// after it creates the node. On reconnect the session may still be valid so the
// ephemeral node still exists. Therefore, on reconnect we need to check if a node
Expand Down Expand Up @@ -1213,6 +1240,10 @@ func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) {
switch op.(type) {
case *CreateRequest:
opCode = opCreate
case *CreateContainerRequest:
opCode = opCreateContainer
case *CreateTTLRequest:
opCode = opCreateTTL
case *SetDataRequest:
opCode = opSetData
case *DeleteRequest:
Expand Down
8 changes: 8 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
opCheck = 13
opMulti = 14
opReconfig = 16
opCreateContainer = 19
opCreateTTL = 21
opClose = -11
opSetAuth = 100
opSetWatches = 101
Expand Down Expand Up @@ -72,6 +74,10 @@ const (
const (
FlagEphemeral = 1
FlagSequence = 2
FlagEphemeralSequence = 3
FlagContainer = 4
FlagWithTTL = 5
FlagSequenceWithTTL = 6
)

var (
Expand Down Expand Up @@ -193,6 +199,8 @@ var (
opNames = map[int32]string{
opNotify: "notify",
opCreate: "create",
opCreateContainer: "createContainer",
opCreateTTL: "createTTL",
opDelete: "delete",
opExists: "exists",
opGetData: "getData",
Expand Down
16 changes: 15 additions & 1 deletion structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ type CreateRequest struct {
Flags int32
}

type CreateContainerRequest CreateRequest

type CreateTTLRequest struct {
Path string
Data []byte
Acl []ACL
Flags int32
Ttl int64
}

type createResponse pathResponse
type DeleteRequest PathVersionRequest
type deleteResponse struct{}
Expand Down Expand Up @@ -365,7 +375,7 @@ func (r *multiResponse) Decode(buf []byte) (int, error) {
return total, ErrAPIError
case opError:
w = reflect.ValueOf(&res.Err)
case opCreate:
case opCreate, opCreateContainer, opCreateTTL:
w = reflect.ValueOf(&res.String)
case opSetData:
res.Stat = new(Stat)
Expand Down Expand Up @@ -589,6 +599,10 @@ func requestStructForOp(op int32) interface{} {
return &closeRequest{}
case opCreate:
return &CreateRequest{}
case opCreateContainer:
return &CreateContainerRequest{}
case opCreateTTL:
return &CreateTTLRequest{}
case opDelete:
return &DeleteRequest{}
case opExists:
Expand Down
78 changes: 78 additions & 0 deletions zk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,84 @@ func TestCreate(t *testing.T) {
}
}

func TestCreateTTL(t *testing.T) {
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()

path := "/gozk-test-ttl"

if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if p, err := zk.CreateTTL(path, []byte{1, 2, 3, 4}, FlagWithTTL, WorldACL(PermAll), 60*1000); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error: %+v", err)
} else if stat == nil {
t.Fatal("Get returned nil stat")
} else if len(data) < 4 {
t.Fatal("Get returned wrong size data")
}

if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if p, err := zk.CreateTTL(path, []byte{1, 2, 3, 4}, FlagSequenceWithTTL, WorldACL(PermAll), 60*1000); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error: %+v", err)
} else if stat == nil {
t.Fatal("Get returned nil stat")
} else if len(data) < 4 {
t.Fatal("Get returned wrong size data")
}
}

func TestCreateContainer(t *testing.T) {
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()

path := "/gozk-test-container"

if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if p, err := zk.CreateContainer(path, []byte{1, 2, 3, 4}, FlagContainer, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error: %+v", err)
} else if stat == nil {
t.Fatal("Get returned nil stat")
} else if len(data) < 4 {
t.Fatal("Get returned wrong size data")
}
}

func TestIncrementalReconfig(t *testing.T) {
if val, ok := os.LookupEnv("zk_version"); ok {
if !strings.HasPrefix(val, "3.5") {
Expand Down

0 comments on commit 1ea4474

Please sign in to comment.