Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge linkedin/go-zk changes upstream #122

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
Support the zxid introduced in 3.9.0 and relay pings to persistent wa…
…tchers (#2)

Like the comment on Event.Zxid mentions, watch events now include the zxid as of
ZK 3.9.0. Additionally, relaying pings to persistent watchers allows them to
keep track of whether they are behind on consuming updates from ZK, particularly
since the zxid is relayed.
  • Loading branch information
PapaCharlie authored Jul 27, 2023
commit 1ac5e35b69fb03d7023baab232a7212bdc957b2c
72 changes: 47 additions & 25 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ type Event struct {
Path string // For non-session events, the path of the watched node.
Err error
Server string // For connection events
// For watch events, the zxid that caused the change (starting with ZK 3.9.0). For ping events, the zxid that the
// server last processed. Note that the last processed zxid is only updated once the watch events have been
// triggered. Since ZK operates over one connection, the watch events are therefore queued up before the ping. This
// means watch events should always be received before pings, and receiving a ping with a given zxid means any watch
// event for a lower zxid have already been received (if any).
Zxid int64
}

// HostProvider is used to represent a set of hosts a ZooKeeper client should connect to.
Expand Down Expand Up @@ -549,39 +555,50 @@ var eventWatchTypes = map[EventType][]watchType{
EventNodeDataChanged: {watchTypeExist, watchTypeData, watchTypePersistent, watchTypePersistentRecursive},
EventNodeChildrenChanged: {watchTypeChild, watchTypePersistent},
EventNodeDeleted: {watchTypeExist, watchTypeData, watchTypeChild, watchTypePersistent, watchTypePersistentRecursive},
EventPingReceived: nil,
}
var persistentWatchTypes = []watchType{watchTypePersistent, watchTypePersistentRecursive}

// Send event to all interested watchers
func (c *Conn) notifyWatches(ev Event) {
wTypes := eventWatchTypes[ev.Type]
if len(wTypes) == 0 {
wTypes, ok := eventWatchTypes[ev.Type]
if !ok {
return
}

c.watchersLock.Lock()
defer c.watchersLock.Unlock()

broadcast := func(wpt watchPathType) {
for _, ch := range c.watchers[wpt] {
ch.push(ev)
if !wpt.wType.isPersistent() {
ch.close()
delete(c.watchers, wpt)
if ev.Type == EventPingReceived {
for wpt, watchers := range c.watchers {
if wpt.wType.isPersistent() {
for _, ch := range watchers {
ch.Push(ev)
}
}
}
} else {
broadcast := func(wpt watchPathType) {
for _, ch := range c.watchers[wpt] {
ch.Push(ev)
if !wpt.wType.isPersistent() {
ch.Close()
delete(c.watchers, wpt)
}
}
}
}

for _, t := range wTypes {
if t == watchTypePersistentRecursive {
for p := ev.Path; ; p, _ = SplitPath(p) {
broadcast(watchPathType{p, t})
if p == "/" {
break
for _, t := range wTypes {
if t == watchTypePersistentRecursive {
for p := ev.Path; ; p, _ = SplitPath(p) {
broadcast(watchPathType{p, t})
if p == "/" {
break
}
}
} else {
broadcast(watchPathType{ev.Path, t})
}
} else {
broadcast(watchPathType{ev.Path, t})
}
}
}
Expand All @@ -603,8 +620,8 @@ func (c *Conn) invalidateWatches(err error) {
ev := Event{Type: EventNotWatching, State: StateDisconnected, Path: pathType.path, Err: err}
c.sendEvent(ev) // also publish globally
for _, ch := range watchers {
ch.push(ev)
ch.close()
ch.Push(ev)
ch.Close()
}
delete(c.watchers, pathType)
}
Expand Down Expand Up @@ -706,7 +723,7 @@ func (c *Conn) sendSetWatches() {
e := Event{Type: EventWatching, State: StateConnected, Path: p}
c.sendEvent(e) // also publish globally
for _, ch := range c.watchers[watchPathType{path: p, wType: wt}] {
ch.push(e)
ch.Push(e)
}
}
}
Expand Down Expand Up @@ -909,6 +926,11 @@ func (c *Conn) recvLoop(conn net.Conn) error {
} else if res.Xid == -2 {
// Ping response. Ignore.
c.metricReceiver.PongReceived()
c.notifyWatches(Event{
Type: EventPingReceived,
State: StateHasSession,
Zxid: res.Zxid,
})
} else if res.Xid < 0 {
c.logger.Printf("Xid < 0 (%d) but not ping or watcher event", res.Xid)
} else {
Expand Down Expand Up @@ -954,7 +976,7 @@ func (c *Conn) addWatcher(path string, watchType watchType, ch EventQueue) {
wpt := watchPathType{path, watchType}
c.watchers[wpt] = append(c.watchers[wpt], ch)
if watchType.isPersistent() {
ch.push(Event{Type: EventWatching, State: StateConnected, Path: path})
ch.Push(Event{Type: EventWatching, State: StateConnected, Path: path})
}
}

Expand Down Expand Up @@ -1513,8 +1535,8 @@ func (c *Conn) RemovePersistentWatch(path string, ch EventQueue) (err error) {
if w == ch {
deleted = true
c.watchers[wpt] = append(c.watchers[wpt][:i], c.watchers[wpt][i+1:]...)
w.push(Event{Type: EventNotWatching, State: c.State(), Path: path, Err: ErrNoWatcher})
w.close()
w.Push(Event{Type: EventNotWatching, State: c.State(), Path: path, Err: ErrNoWatcher})
w.Close()
return
}
}
Expand Down Expand Up @@ -1548,8 +1570,8 @@ func (c *Conn) RemoveAllPersistentWatches(path string) (err error) {
for _, wt := range persistentWatchTypes {
wpt := watchPathType{path: path, wType: wt}
for _, ch := range c.watchers[wpt] {
ch.push(Event{Type: EventNotWatching, State: c.State(), Path: path, Err: ErrNoWatcher})
ch.close()
ch.Push(Event{Type: EventNotWatching, State: c.State(), Path: path, Err: ErrNoWatcher})
ch.Close()
}
delete(c.watchers, wpt)
}
Expand Down
8 changes: 5 additions & 3 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ const (
EventNodeChildrenChanged EventType = 4

// EventSession represents a session event.
EventSession EventType = -1
EventNotWatching EventType = -2
EventWatching EventType = -3
EventSession EventType = -1
EventNotWatching EventType = -2
EventWatching EventType = -3
EventPingReceived EventType = -4
)

var (
Expand All @@ -65,6 +66,7 @@ var (
EventSession: "EventSession",
EventNotWatching: "EventNotWatching",
EventWatching: "EventWatching",
EventPingReceived: "EventPingReceived",
}
)

Expand Down
17 changes: 11 additions & 6 deletions unlimited_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ import (
var ErrEventQueueClosed = errors.New("zk: event queue closed")

type EventQueue interface {
// Next waits for a new Event to be received until the context expires or the queue is closed.
Next(ctx context.Context) (Event, error)
push(e Event)
close()
// Push adds the given event to the queue and notifies any in-flight calls to Next that a new event is available.
Push(e Event)
// Close functions like closing a channel. Subsequent calls to Next will drain whatever events remain in the buffer
// while subsequent calls to Push will panic. Once remaining events are drained, Next will return
// ErrEventQueueClosed.
Close()
}

type chanEventQueue chan Event
Expand All @@ -29,11 +34,11 @@ func (c chanEventQueue) Next(ctx context.Context) (Event, error) {
}
}

func (c chanEventQueue) push(e Event) {
func (c chanEventQueue) Push(e Event) {
c <- e
}

func (c chanEventQueue) close() {
func (c chanEventQueue) Close() {
close(c)
}

Expand All @@ -53,7 +58,7 @@ func newUnlimitedEventQueue() *unlimitedEventQueue {
}
}

func (q *unlimitedEventQueue) push(e Event) {
func (q *unlimitedEventQueue) Push(e Event) {
q.lock.Lock()
defer q.lock.Unlock()

Expand All @@ -67,7 +72,7 @@ func (q *unlimitedEventQueue) push(e Event) {
q.newEvent = make(chan struct{})
}

func (q *unlimitedEventQueue) close() {
func (q *unlimitedEventQueue) Close() {
q.lock.Lock()
defer q.lock.Unlock()

Expand Down
29 changes: 26 additions & 3 deletions unlimited_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"reflect"
"sync"
"testing"
"time"
)
Expand All @@ -24,10 +25,10 @@ func TestUnlimitedChannel(t *testing.T) {

// check that events can be pushed without consumers
for i := 0; i < eventCount; i++ {
ch.push(newEvent(i))
ch.Push(newEvent(i))
}
if closeAfterPushes {
ch.close()
ch.Close()
}

for events := 0; events < eventCount; events++ {
Expand Down Expand Up @@ -68,7 +69,7 @@ func TestUnlimitedChannel(t *testing.T) {
ctx = &customContext{
Context: ctx,
f: func() {
ch.push(expected)
ch.Push(expected)
},
}

Expand All @@ -81,6 +82,28 @@ func TestUnlimitedChannel(t *testing.T) {
}
}
})
t.Run("multiple consumers", func(t *testing.T) {
ch := newUnlimitedEventQueue()
for i := 0; i < 20; i++ {
ch.Push(newEvent(i))
}
ch.Close()
var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 5; i++ {
go func() {
for {
_, err := ch.Next(context.Background())
if errors.Is(err, ErrEventQueueClosed) {
return
}
requireNoErrorf(t, err)
wg.Done()
}
}()
}
wg.Wait()
})
}

type customContext struct {
Expand Down
Loading