Skip to content

Commit

Permalink
Merge pull request samuel#126 from nomis52/event-callback
Browse files Browse the repository at this point in the history
Add an option to supply a callback to be run for each Event.
  • Loading branch information
samuel authored Aug 26, 2016
2 parents e64db45 + 4f57edd commit 2a8f028
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 7 deletions.
23 changes: 22 additions & 1 deletion zk/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,38 @@ func TestNoQuorum(t *testing.T) {

// Kill the ZooKeeper leader and wait for the session to reconnect.
DefaultLogger.Printf(" Kill the leader")
disconnectWatcher1 := sl.NewWatcher(sessionStateMatcher(StateDisconnected))
hasSessionWatcher2 := sl.NewWatcher(sessionStateMatcher(StateHasSession))
tc.StopServer(hasSessionEvent1.Server)

disconnectedEvent1 := disconnectWatcher1.Wait(8 * time.Second)
if disconnectedEvent1 == nil {
t.Fatalf("Failover failed, missed StateDisconnected event")
}
if disconnectedEvent1.Server != hasSessionEvent1.Server {
t.Fatalf("Unexpected StateDisconnected event, expected=%s, actual=%s",
hasSessionEvent1.Server, disconnectedEvent1.Server)
}

hasSessionEvent2 := hasSessionWatcher2.Wait(8 * time.Second)
if hasSessionEvent2 == nil {
t.Fatalf("Failover failed")
t.Fatalf("Failover failed, missed StateHasSession event")
}

// Kill the ZooKeeper leader leaving the cluster without quorum.
DefaultLogger.Printf(" Kill the leader")
disconnectWatcher2 := sl.NewWatcher(sessionStateMatcher(StateDisconnected))
tc.StopServer(hasSessionEvent2.Server)

disconnectedEvent2 := disconnectWatcher2.Wait(8 * time.Second)
if disconnectedEvent2 == nil {
t.Fatalf("Failover failed, missed StateDisconnected event")
}
if disconnectedEvent2.Server != hasSessionEvent2.Server {
t.Fatalf("Unexpected StateDisconnected event, expected=%s, actual=%s",
hasSessionEvent2.Server, disconnectedEvent2.Server)
}

// Make sure that we keep retrying connecting to the only remaining
// ZooKeeper server, but the attempts are being dropped because there is
// no quorum.
Expand Down
28 changes: 23 additions & 5 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Conn struct {
server string // remember the address/port of the current server
conn net.Conn
eventChan chan Event
eventCallback EventCallback // may be nil
shouldQuit chan struct{}
pingInterval time.Duration
recvTimeout time.Duration
Expand Down Expand Up @@ -224,6 +225,18 @@ func WithHostProvider(hostProvider HostProvider) connOption {
}
}

// EventCallback is a function that is called when an Event occurs.
type EventCallback func(Event)

// WithEventCallback returns a connection option that specifies an event
// callback.
// The callback must not block - doing so would delay the ZK go routines.
func WithEventCallback(cb EventCallback) connOption {
return func(c *Conn) {
c.eventCallback = cb
}
}

func (c *Conn) Close() {
close(c.shouldQuit)

Expand Down Expand Up @@ -258,8 +271,16 @@ func (c *Conn) setTimeouts(sessionTimeoutMs int32) {

func (c *Conn) setState(state State) {
atomic.StoreInt32((*int32)(&c.state), int32(state))
c.sendEvent(Event{Type: EventSession, State: state, Server: c.Server()})
}

func (c *Conn) sendEvent(evt Event) {
if c.eventCallback != nil {
c.eventCallback(evt)
}

select {
case c.eventChan <- Event{Type: EventSession, State: state, Server: c.Server()}:
case c.eventChan <- evt:
default:
// panic("zk: event channel full - it must be monitored and never allowed to be full")
}
Expand Down Expand Up @@ -611,10 +632,7 @@ func (c *Conn) recvLoop(conn net.Conn) error {
Path: res.Path,
Err: nil,
}
select {
case c.eventChan <- ev:
default:
}
c.sendEvent(ev)
wTypes := make([]watchType, 0, 2)
switch res.Type {
case EventNodeCreated:
Expand Down
6 changes: 5 additions & 1 deletion zk/server_help.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,15 @@ func (ts *TestCluster) ConnectAll() (*Conn, <-chan Event, error) {
}

func (ts *TestCluster) ConnectAllTimeout(sessionTimeout time.Duration) (*Conn, <-chan Event, error) {
return ts.ConnectWithOptions(sessionTimeout)
}

func (ts *TestCluster) ConnectWithOptions(sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error) {
hosts := make([]string, len(ts.Servers))
for i, srv := range ts.Servers {
hosts[i] = fmt.Sprintf("127.0.0.1:%d", srv.Port)
}
zk, ch, err := Connect(hosts, sessionTimeout)
zk, ch, err := Connect(hosts, sessionTimeout, options...)
return zk, ch, err
}

Expand Down
46 changes: 46 additions & 0 deletions zk/zk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,52 @@ import (
"time"
)

func TestStateChanges(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()

callbackChan := make(chan Event)
f := func(event Event) {
callbackChan <- event
}

zk, eventChan, err := ts.ConnectWithOptions(15*time.Second, WithEventCallback(f))
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}

verifyEventOrder := func(c <-chan Event, expectedStates []State, source string) {
for _, state := range expectedStates {
for {
event, ok := <-c
if !ok {
t.Fatalf("unexpected channel close for %s", source)
}

if event.Type != EventSession {
continue
}

if event.State != state {
t.Fatalf("mismatched state order from %s, expected %v, received %v", source, state, event.State)
}
break
}
}
}

states := []State{StateConnecting, StateConnected, StateHasSession}
verifyEventOrder(callbackChan, states, "callback")
verifyEventOrder(eventChan, states, "event channel")

zk.Close()
verifyEventOrder(callbackChan, []State{StateDisconnected}, "callback")
verifyEventOrder(eventChan, []State{StateDisconnected}, "event channel")
}

func TestCreate(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
Expand Down

0 comments on commit 2a8f028

Please sign in to comment.