Skip to content

Commit

Permalink
Adding ZK 3.5 reconfig APIs (samuel#206)
Browse files Browse the repository at this point in the history
* Setting up travis and builds for more than one ZK version to test against
* Reorg tests to support 3.4 and 3.5 clusters
* travis up to go 1.11
* Adding more go version to the matrix and allow failures for go tip and 3.5 zk
* Change to always using zookeeper server scripts. Adding wait for start in test scripts
* Adding waitForStop on testing helpers. Also update comments
* Light test cleanup - more makefile to start to have a common set of build instructions
* ZK 3.5 part 2 - Adding reconfig and incremental reconfig apis
* adjust test config from 3.5 option - dont ignore 3.5 test matrix
* test cases dont care if we get a connection close as we might have been connected to a node that is no longer accepting connecitons
  • Loading branch information
jeffbean authored and samuel committed Aug 1, 2019
1 parent c4fab1a commit 758ce21
Show file tree
Hide file tree
Showing 14 changed files with 540 additions and 218 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
.vscode/
.DS_Store
profile.cov
zookeeper
zookeeper-*/
zookeeper-*.tar.gz
27 changes: 16 additions & 11 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
language: go
go:
- 1.11
- 1.9
- tip

go_import_path: github.com/samuel/go-zookeeper

jdk:
- oraclejdk9
Expand All @@ -12,22 +16,23 @@ branches:
- master

before_install:
- wget http://apache.cs.utah.edu/zookeeper/zookeeper-${zk_version}/zookeeper-${zk_version}.tar.gz
- tar -zxvf zookeeper*tar.gz && zip -d zookeeper-${zk_version}/contrib/fatjar/zookeeper-${zk_version}-fatjar.jar 'META-INF/*.SF' 'META-INF/*.DSA'
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
- make setup ZK_VERSION=${zk_version}

before_script:
- make lint

script:
- jdk_switcher use oraclejdk9
- go build ./...
- go fmt ./...
- go vet ./...
- go test -i -race ./...
- go test -race -covermode atomic -coverprofile=profile.cov ./zk
- goveralls -coverprofile=profile.cov -service=travis-ci
- make

matrix:
allow_failures:
- go: tip
fast_finish: true

env:
global:
secure: Coha3DDcXmsekrHCZlKvRAc+pMBaQU1QS/3++3YCCUXVDBWgVsC1ZIc9df4RLdZ/ncGd86eoRq/S+zyn1XbnqK5+ePqwJoUnJ59BE8ZyHLWI9ajVn3fND1MTduu/ksGsS79+IYbdVI5wgjSgjD3Ktp6Y5uPl+BPosjYBGdNcHS4=
matrix:
- zk_version=3.4.10
- zk_version=3.5.4-beta
- zk_version=3.4.12
39 changes: 39 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# make file to hold the logic of build and test setup
ZK_VERSION ?= 3.4.12

ZK = zookeeper-$(ZK_VERSION)
ZK_URL = "https://archive.apache.org/dist/zookeeper/$(ZK)/$(ZK).tar.gz"

PACKAGES := $(shell go list ./... | grep -v examples)

.DEFAULT_GOAL := test

$(ZK):
wget $(ZK_URL)
tar -zxf $(ZK).tar.gz
# we link to a standard directory path so then the tests dont need to find based on version
# in the test code. this allows backward compatable testing.
ln -s $(ZK) zookeeper

.PHONY: install-covertools
install-covertools:
go get github.com/mattn/goveralls
go get golang.org/x/tools/cmd/cover

.PHONY: setup
setup: $(ZK) install-covertools

.PHONY: lint
lint:
go fmt ./...
go vet ./...

.PHONY: build
build:
go build ./...

.PHONY: test
test: build
go test -timeout 500s -v -race -covermode atomic -coverprofile=profile.cov $(PACKAGES)
# ignore if we fail to publish coverage
-goveralls -coverprofile=profile.cov -service=travis-ci
17 changes: 9 additions & 8 deletions zk/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ func (lw logWriter) Write(b []byte) (int, error) {
}

func TestBasicCluster(t *testing.T) {
ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
ts, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk1, err := ts.Connect(0)
zk1, _, err := ts.Connect(0)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk1.Close()
zk2, err := ts.Connect(1)
zk2, _, err := ts.Connect(1)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
Expand All @@ -38,6 +38,7 @@ func TestBasicCluster(t *testing.T) {
if _, err := zk1.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create failed on node 1: %+v", err)
}

if by, _, err := zk2.Get("/gozk-test"); err != nil {
t.Fatalf("Get failed on node 2: %+v", err)
} else if string(by) != "foo-cluster" {
Expand All @@ -47,7 +48,7 @@ func TestBasicCluster(t *testing.T) {

// If the current leader dies, then the session is reestablished with the new one.
func TestClientClusterFailover(t *testing.T) {
tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
tc, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -89,7 +90,7 @@ func TestClientClusterFailover(t *testing.T) {
// If a ZooKeeper cluster looses quorum then a session is reconnected as soon
// as the quorum is restored.
func TestNoQuorum(t *testing.T) {
tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
tc, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -185,12 +186,12 @@ func TestNoQuorum(t *testing.T) {
}

func TestWaitForClose(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, err := ts.Connect(0)
zk, _, err := ts.Connect(0)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
Expand Down Expand Up @@ -221,7 +222,7 @@ CONNECTED:
}

func TestBadSession(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand Down
60 changes: 46 additions & 14 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,13 +409,11 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
defer close(reauthReadyChan)

if c.logInfo {
c.logger.Printf("Re-submitting `%d` credentials after reconnect",
len(c.creds))
c.logger.Printf("re-submitting `%d` credentials after reconnect", len(c.creds))
}

for _, cred := range c.creds {
if shouldCancel() {
c.logger.Printf("Cancel rer-submitting credentials")
return
}
resChan, err := c.sendRequest(
Expand All @@ -428,7 +426,7 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
nil)

if err != nil {
c.logger.Printf("Call to sendRequest failed during credential resubmit: %s", err)
c.logger.Printf("call to sendRequest failed during credential resubmit: %s", err)
// FIXME(prozlach): lets ignore errors for now
continue
}
Expand All @@ -437,14 +435,14 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
select {
case res = <-resChan:
case <-c.closeChan:
c.logger.Printf("Recv closed, cancel re-submitting credentials")
c.logger.Printf("recv closed, cancel re-submitting credentials")
return
case <-c.shouldQuit:
c.logger.Printf("Should quit, cancel re-submitting credentials")
c.logger.Printf("should quit, cancel re-submitting credentials")
return
}
if res.err != nil {
c.logger.Printf("Credential re-submit failed: %s", res.err)
c.logger.Printf("credential re-submit failed: %s", res.err)
// FIXME(prozlach): lets ignore errors for now
continue
}
Expand Down Expand Up @@ -486,14 +484,14 @@ func (c *Conn) loop() {
err := c.authenticate()
switch {
case err == ErrSessionExpired:
c.logger.Printf("Authentication failed: %s", err)
c.logger.Printf("authentication failed: %s", err)
c.invalidateWatches(err)
case err != nil && c.conn != nil:
c.logger.Printf("Authentication failed: %s", err)
c.logger.Printf("authentication failed: %s", err)
c.conn.Close()
case err == nil:
if c.logInfo {
c.logger.Printf("Authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
c.logger.Printf("authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
}
c.hostProvider.Connected() // mark success
c.closeChan = make(chan struct{}) // channel to tell send loop stop
Expand All @@ -508,7 +506,7 @@ func (c *Conn) loop() {
}
err := c.sendLoop()
if err != nil || c.logInfo {
c.logger.Printf("Send loop terminated: err=%v", err)
c.logger.Printf("send loop terminated: err=%v", err)
}
c.conn.Close() // causes recv loop to EOF/exit
wg.Done()
Expand All @@ -523,7 +521,7 @@ func (c *Conn) loop() {
err = c.recvLoop(c.conn)
}
if err != io.EOF || c.logInfo {
c.logger.Printf("Recv loop terminated: err=%v", err)
c.logger.Printf("recv loop terminated: err=%v", err)
}
if err == nil {
panic("zk: recvLoop should never return nil error")
Expand Down Expand Up @@ -823,10 +821,12 @@ func (c *Conn) recvLoop(conn net.Conn) error {
buf := make([]byte, sz)
for {
// package length
conn.SetReadDeadline(time.Now().Add(c.recvTimeout))
if err := conn.SetReadDeadline(time.Now().Add(c.recvTimeout)); err != nil {
c.logger.Printf("failed to set connection deadline: %v", err)
}
_, err := io.ReadFull(conn, buf[:4])
if err != nil {
return err
return fmt.Errorf("failed to read from connection: %v", err)
}

blen := int(binary.BigEndian.Uint32(buf[:4]))
Expand Down Expand Up @@ -1220,6 +1220,38 @@ func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) {
return mr, err
}

// IncrementalReconfig is the zookeeper reconfiguration api that allows adding and removing servers
// by lists of members.
// Return the new configuration stats.
func (c *Conn) IncrementalReconfig(joining, leaving []string, version int64) (*Stat, error) {
// TODO: validate the shape of the member string to give early feedback.
request := &reconfigRequest{
JoiningServers: []byte(strings.Join(joining, ",")),
LeavingServers: []byte(strings.Join(leaving, ",")),
CurConfigId: version,
}

return c.internalReconfig(request)
}

// Reconfig is the non-incremental update functionality for Zookeeper where the list preovided
// is the entire new member list.
// the optional version allows for conditional reconfigurations, -1 ignores the condition.
func (c *Conn) Reconfig(members []string, version int64) (*Stat, error) {
request := &reconfigRequest{
NewMembers: []byte(strings.Join(members, ",")),
CurConfigId: version,
}

return c.internalReconfig(request)
}

func (c *Conn) internalReconfig(request *reconfigRequest) (*Stat, error) {
response := &reconfigReponse{}
_, err := c.request(opReconfig, request, response, nil)
return &response.Stat, err
}

// Server returns the current or last-connected server name.
func (c *Conn) Server() string {
c.serverMu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion zk/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestRecurringReAuthHang(t *testing.T) {
}
}()

zkC, err := StartTestCluster(2, ioutil.Discard, ioutil.Discard)
zkC, err := StartTestCluster(t, 2, ioutil.Discard, ioutil.Discard)
if err != nil {
panic(err)
}
Expand Down
25 changes: 17 additions & 8 deletions zk/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zk

import (
"errors"
"fmt"
)

const (
Expand All @@ -25,6 +26,7 @@ const (
opGetChildren2 = 12
opCheck = 13
opMulti = 14
opReconfig = 16
opClose = -11
opSetAuth = 100
opSetWatches = 101
Expand Down Expand Up @@ -92,7 +94,7 @@ func (s State) String() string {
if name := stateNames[s]; name != "" {
return name
}
return "Unknown"
return "unknown state"
}

type ErrCode int32
Expand All @@ -113,8 +115,10 @@ var (
ErrClosing = errors.New("zk: zookeeper is closing")
ErrNothing = errors.New("zk: no server responsees to process")
ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored")

ErrReconfigDisabled = errors.New("attempts to perform a reconfiguration operation when reconfiguration feature is disabled")
ErrBadArguments = errors.New("invalid arguments")
// ErrInvalidCallback = errors.New("zk: invalid callback specified")

errCodeToError = map[ErrCode]error{
0: nil,
errAPIError: ErrAPIError,
Expand All @@ -126,19 +130,21 @@ var (
errNotEmpty: ErrNotEmpty,
errSessionExpired: ErrSessionExpired,
// errInvalidCallback: ErrInvalidCallback,
errInvalidAcl: ErrInvalidACL,
errAuthFailed: ErrAuthFailed,
errClosing: ErrClosing,
errNothing: ErrNothing,
errSessionMoved: ErrSessionMoved,
errInvalidAcl: ErrInvalidACL,
errAuthFailed: ErrAuthFailed,
errClosing: ErrClosing,
errNothing: ErrNothing,
errSessionMoved: ErrSessionMoved,
errZReconfigDisabled: ErrReconfigDisabled,
errBadArguments: ErrBadArguments,
}
)

func (e ErrCode) toError() error {
if err, ok := errCodeToError[e]; ok {
return err
}
return ErrUnknown
return errors.New(fmt.Sprintf("unknown error: %v", e))
}

const (
Expand Down Expand Up @@ -168,6 +174,8 @@ const (
errClosing ErrCode = -116
errNothing ErrCode = -117
errSessionMoved ErrCode = -118
// Attempts to perform a reconfiguration operation when reconfiguration feature is disabled
errZReconfigDisabled ErrCode = -123
)

// Constants for ACL permissions
Expand Down Expand Up @@ -197,6 +205,7 @@ var (
opGetChildren2: "getChildren2",
opCheck: "check",
opMulti: "multi",
opReconfig: "reconfig",
opClose: "close",
opSetAuth: "setAuth",
opSetWatches: "setWatches",
Expand Down
Loading

0 comments on commit 758ce21

Please sign in to comment.