Skip to content
This repository has been archived by the owner on Oct 7, 2023. It is now read-only.

Commit

Permalink
zketcd: prefetch STM keys
Browse files Browse the repository at this point in the history
Fixes #39
  • Loading branch information
Anthony Romano committed Sep 1, 2017
1 parent 2df207a commit f8daacc
Showing 1 changed file with 33 additions and 14 deletions.
47 changes: 33 additions & 14 deletions zketcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ type zkEtcd struct {
}

type opBundle struct {
apply func(v3sync.STM) error
reply func(Xid, ZXid) ZKResponse
apply func(v3sync.STM) error
reply func(Xid, ZXid) ZKResponse
prefetch []string
}

// PerfectZXid is enabled to insert err writes to match zookeeper's zxids
Expand All @@ -45,7 +46,7 @@ func NewZKEtcd(c *etcd.Client, s Session) ZK { return &zkEtcd{c, s} }

func (z *zkEtcd) Create(xid Xid, op *CreateRequest) ZKResponse {
b := z.mkCreateTxnOp(op)
resp, zkErr := z.doWrappedSTM(xid, b.apply)
resp, zkErr := z.doWrappedSTM(xid, b.apply, b.prefetch...)
if resp == nil {
return zkErr
}
Expand Down Expand Up @@ -128,7 +129,11 @@ func (z *zkEtcd) mkCreateTxnOp(op *CreateRequest) opBundle {
z.s.Wait(zxid, p, EventNodeCreated)
return mkZKResp(xid, zxid, &CreateResponse{respPath})
}
return opBundle{apply, reply}
return opBundle{
apply,
reply,
[]string{mkPathCTime(pp), mkPathCount(pp), mkPathCTime(mkPath(op.Path))},
}
}

func (z *zkEtcd) GetChildren2(xid Xid, op *GetChildren2Request) ZKResponse {
Expand Down Expand Up @@ -176,7 +181,7 @@ func (z *zkEtcd) Ping(xid Xid, op *PingRequest) ZKResponse {

func (z *zkEtcd) Delete(xid Xid, op *DeleteRequest) ZKResponse {
b := z.mkDeleteTxnOp(op)
resp, zkErr := z.doWrappedSTM(xid, b.apply)
resp, zkErr := z.doWrappedSTM(xid, b.apply, b.prefetch...)
if resp == nil {
return zkErr
}
Expand Down Expand Up @@ -245,7 +250,11 @@ func (z *zkEtcd) mkDeleteTxnOp(op *DeleteRequest) opBundle {
return mkZKResp(xid, zxid, &DeleteResponse{})
}

return opBundle{apply, reply}
return opBundle{
apply,
reply,
[]string{mkPathCTime(pp), mkPathCTime(p), mkPathCVer(p)},
}
}

func (z *zkEtcd) Exists(xid Xid, op *ExistsRequest) ZKResponse {
Expand Down Expand Up @@ -323,7 +332,7 @@ func (z *zkEtcd) GetData(xid Xid, op *GetDataRequest) ZKResponse {

func (z *zkEtcd) SetData(xid Xid, op *SetDataRequest) ZKResponse {
b := z.mkSetDataTxnOp(op)
resp, zkErr := z.doWrappedSTM(xid, b.apply)
resp, zkErr := z.doWrappedSTM(xid, b.apply, b.prefetch...)
if resp == nil {
return zkErr
}
Expand Down Expand Up @@ -364,7 +373,7 @@ func (z *zkEtcd) mkSetDataTxnOp(op *SetDataRequest) opBundle {
return mkZKResp(xid, zxid, &SetDataResponse{Stat: st})
}

return opBundle{apply, reply}
return opBundle{apply, reply, nil}
}

func (z *zkEtcd) GetAcl(xid Xid, op *GetAclRequest) ZKResponse {
Expand Down Expand Up @@ -470,6 +479,11 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse {
}
}

prefetch := []string{}
for _, b := range bs {
prefetch = append(prefetch, b.prefetch...)
}

apply := func(s v3sync.STM) error {
for _, b := range bs {
if err := b.apply(s); err != nil {
Expand Down Expand Up @@ -506,7 +520,7 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse {
return mkZKResp(xid, zxid, mresp)
}

resp, err := z.doSTM(apply)
resp, err := z.doSTM(apply, prefetch...)
if resp == nil {
// txn aborted, possibly due to any API error
if _, ok := errorToErrCode[err]; !ok {
Expand Down Expand Up @@ -550,7 +564,7 @@ func (z *zkEtcd) mkCheckVersionPathTxnOp(op *CheckVersionRequest) opBundle {
reply := func(xid Xid, zxid ZXid) ZKResponse {
return mkZKResp(xid, zxid, &struct{}{})
}
return opBundle{apply, reply}
return opBundle{apply, reply, nil}
}

func (z *zkEtcd) Close(xid Xid, op *CloseRequest) ZKResponse {
Expand Down Expand Up @@ -634,9 +648,9 @@ func (z *zkEtcd) SetWatches(xid Xid, op *SetWatchesRequest) ZKResponse {
return mkZKResp(xid, curZXid, swresp)
}

func (z *zkEtcd) doWrappedSTM(xid Xid, applyf func(s v3sync.STM) error) (*etcd.TxnResponse, ZKResponse) {
func (z *zkEtcd) doWrappedSTM(xid Xid, applyf func(s v3sync.STM) error, prefetch ...string) (*etcd.TxnResponse, ZKResponse) {
var apiErr error
resp, err := z.doSTM(wrapErr(&apiErr, applyf))
resp, err := z.doSTM(wrapErr(&apiErr, applyf), prefetch...)
if err != nil {
return nil, mkErr(err)
}
Expand All @@ -654,8 +668,13 @@ func apiErrToZKErr(xid Xid, zxid ZXid, apiErr error) ZKResponse {
return mkZKErr(xid, zxid, errCode)
}

func (z *zkEtcd) doSTM(applyf func(s v3sync.STM) error) (*etcd.TxnResponse, error) {
return v3sync.NewSTMSerializable(z.c.Ctx(), z.c, applyf)
func (z *zkEtcd) doSTM(applyf func(s v3sync.STM) error, prefetch ...string) (*etcd.TxnResponse, error) {
return v3sync.NewSTM(
z.c,
applyf,
v3sync.WithIsolation(v3sync.Serializable),
v3sync.WithPrefetch(prefetch...),
)
}

// incrementAndGetZxid forces a write to the err-node to increment the Zxid
Expand Down

0 comments on commit f8daacc

Please sign in to comment.