Skip to content

Commit

Permalink
break up set-watches on reconnect into multiple packets (samuel#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhump authored and samuel committed Jul 26, 2017
1 parent 789e9fa commit 6f3354f
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 37 deletions.
3 changes: 1 addition & 2 deletions zk/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ func TestClientClusterFailover(t *testing.T) {
tc.StopServer(hasSessionEvent1.Server)

// Wait for the session to be reconnected with the new leader.
hasSessionWatcher2.Wait(8 * time.Second)
if hasSessionWatcher2 == nil {
if hasSessionWatcher2.Wait(8*time.Second) == nil {
t.Fatalf("Failover failed")
}

Expand Down
65 changes: 51 additions & 14 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ type Conn struct {
closeChan chan struct{} // channel to tell send loop stop

// Debug (used by unit tests)
reconnectDelay time.Duration
reconnectLatch chan struct{}
setWatchLimit int
setWatchCallback func([]*setWatchesRequest)

logger Logger

Expand Down Expand Up @@ -199,9 +201,6 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti
passwd: emptyPassword,
logger: DefaultLogger,
buf: make([]byte, bufferSize),

// Debug
reconnectDelay: 0,
}

// Set provided options.
Expand Down Expand Up @@ -481,11 +480,11 @@ func (c *Conn) loop() {
}
c.flushRequests(err)

if c.reconnectDelay > 0 {
if c.reconnectLatch != nil {
select {
case <-c.shouldQuit:
return
case <-time.After(c.reconnectDelay):
case <-c.reconnectLatch:
}
}
}
Expand Down Expand Up @@ -537,17 +536,41 @@ func (c *Conn) sendSetWatches() {
return
}

req := &setWatchesRequest{
RelativeZxid: c.lastZxid,
DataWatches: make([]string, 0),
ExistWatches: make([]string, 0),
ChildWatches: make([]string, 0),
// NB: A ZK server, by default, rejects packets >1mb. So, if we have too
// many watches to reset, we need to break this up into multiple packets
// to avoid hitting that limit. Mirroring the Java client behavior: we are
// conservative in that we limit requests to 128kb (since server limit is
// is actually configurable and could conceivably be configured smaller
// than default of 1mb).
limit := 128 * 1024
if c.setWatchLimit > 0 {
limit = c.setWatchLimit
}

var reqs []*setWatchesRequest
var req *setWatchesRequest
var sizeSoFar int

n := 0
for pathType, watchers := range c.watchers {
if len(watchers) == 0 {
continue
}
addlLen := 4 + len(pathType.path)
if req == nil || sizeSoFar+addlLen > limit {
if req != nil {
// add to set of requests that we'll send
reqs = append(reqs, req)
}
sizeSoFar = 28 // fixed overhead of a set-watches packet
req = &setWatchesRequest{
RelativeZxid: c.lastZxid,
DataWatches: make([]string, 0),
ExistWatches: make([]string, 0),
ChildWatches: make([]string, 0),
}
}
sizeSoFar += addlLen
switch pathType.wType {
case watchTypeData:
req.DataWatches = append(req.DataWatches, pathType.path)
Expand All @@ -561,12 +584,26 @@ func (c *Conn) sendSetWatches() {
if n == 0 {
return
}
if req != nil { // don't forget any trailing packet we were building
reqs = append(reqs, req)
}

if c.setWatchCallback != nil {
c.setWatchCallback(reqs)
}

go func() {
res := &setWatchesResponse{}
_, err := c.request(opSetWatches, req, res, nil)
if err != nil {
c.logger.Printf("Failed to set previous watches: %s", err.Error())
// TODO: Pipeline these so queue all of them up before waiting on any
// response. That will require some investigation to make sure there
// aren't failure modes where a blocking write to the channel of requests
// could hang indefinitely and cause this goroutine to leak...
for _, req := range reqs {
_, err := c.request(opSetWatches, req, res, nil)
if err != nil {
c.logger.Printf("Failed to set previous watches: %s", err.Error())
break
}
}
}()
}
Expand Down
104 changes: 83 additions & 21 deletions zk/zk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -468,7 +469,12 @@ func TestSetWatchers(t *testing.T) {
}
defer zk.Close()

zk.reconnectDelay = time.Second
zk.reconnectLatch = make(chan struct{})
zk.setWatchLimit = 1024 // break up set-watch step into 1k requests
var setWatchReqs atomic.Value
zk.setWatchCallback = func(reqs []*setWatchesRequest) {
setWatchReqs.Store(reqs)
}

zk2, _, err := ts.ConnectAll()
if err != nil {
Expand All @@ -480,14 +486,27 @@ func TestSetWatchers(t *testing.T) {
t.Fatalf("Delete returned error: %+v", err)
}

testPath, err := zk.Create("/gozk-test-2", []byte{}, 0, WorldACL(PermAll))
if err != nil {
t.Fatalf("Create returned: %+v", err)
}
testPaths := map[string]<-chan Event{}
defer func() {
// clean up all of the test paths we create
for p := range testPaths {
zk2.Delete(p, -1)
}
}()

_, _, testEvCh, err := zk.GetW(testPath)
if err != nil {
t.Fatalf("GetW returned: %+v", err)
// we create lots of paths to watch, to make sure a "set watches" request
// on re-create will be too big and be required to span multiple packets
for i := 0; i < 1000; i++ {
testPath, err := zk.Create(fmt.Sprintf("/gozk-test-%d", i), []byte{}, 0, WorldACL(PermAll))
if err != nil {
t.Fatalf("Create returned: %+v", err)
}
testPaths[testPath] = nil
_, _, testEvCh, err := zk.GetW(testPath)
if err != nil {
t.Fatalf("GetW returned: %+v", err)
}
testPaths[testPath] = testEvCh
}

children, stat, childCh, err := zk.ChildrenW("/")
Expand All @@ -501,28 +520,48 @@ func TestSetWatchers(t *testing.T) {

// Simulate network error by brutally closing the network connection.
zk.conn.Close()
if err := zk2.Delete(testPath, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
for p := range testPaths {
if err := zk2.Delete(p, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
}
// Allow some time for the `zk` session to reconnect and set watches.
time.Sleep(time.Millisecond * 100)

if path, err := zk2.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if path != "/gozk-test" {
t.Fatalf("Create returned different path '%s' != '/gozk-test'", path)
}

select {
case ev := <-testEvCh:
if ev.Err != nil {
t.Fatalf("GetW watcher error %+v", ev.Err)
time.Sleep(100 * time.Millisecond)

// zk should still be waiting to reconnect, so none of the watches should have been triggered
for p, ch := range testPaths {
select {
case <-ch:
t.Fatalf("GetW watcher for %q should not have triggered yet", p)
default:
}
if ev.Path != testPath {
t.Fatalf("GetW watcher wrong path %s instead of %s", ev.Path, testPath)
}
select {
case <-childCh:
t.Fatalf("ChildrenW watcher should not have triggered yet")
default:
}

// now we let the reconnect occur and make sure it resets watches
close(zk.reconnectLatch)

for p, ch := range testPaths {
select {
case ev := <-ch:
if ev.Err != nil {
t.Fatalf("GetW watcher error %+v", ev.Err)
}
if ev.Path != p {
t.Fatalf("GetW watcher wrong path %s instead of %s", ev.Path, p)
}
case <-time.After(2 * time.Second):
t.Fatal("GetW watcher timed out")
}
case <-time.After(2 * time.Second):
t.Fatal("GetW watcher timed out")
}

select {
Expand All @@ -536,6 +575,29 @@ func TestSetWatchers(t *testing.T) {
case <-time.After(2 * time.Second):
t.Fatal("Child watcher timed out")
}

// Yay! All watches fired correctly. Now we also inspect the actual set-watch request objects
// to ensure they didn't exceed the expected packet set.
buf := make([]byte, bufferSize)
totalWatches := 0
actualReqs := setWatchReqs.Load().([]*setWatchesRequest)
if len(actualReqs) < 12 {
// sanity check: we should have generated *at least* 12 requests to reset watches
t.Fatalf("too few setWatchesRequest messages: %d", len(actualReqs))
}
for _, r := range actualReqs {
totalWatches += len(r.ChildWatches) + len(r.DataWatches) + len(r.ExistWatches)
n, err := encodePacket(buf, r)
if err != nil {
t.Fatalf("encodePacket failed: %v! request:\n%+v", err, r)
} else if n > 1024 {
t.Fatalf("setWatchesRequest exceeded allowed size (%d > 1024)! request:\n%+v", n, r)
}
}

if totalWatches != len(testPaths)+1 {
t.Fatalf("setWatchesRequests did not include all expected watches; expecting %d, got %d", len(testPaths)+1, totalWatches)
}
}

func TestExpiringWatch(t *testing.T) {
Expand Down

0 comments on commit 6f3354f

Please sign in to comment.