Skip to content

Commit

Permalink
Add an option to supply a callback to be run for each Event.
Browse files Browse the repository at this point in the history
Background:

The channel approach causes lost events during period of scheduling contention.
Of particular harm is the loss of HasSession since it's usually the state that
triggers actions.

The callback approch puts the responsiblity on the caller of the library to
decide on the tradeoff between losing events and blocking the main event loop.
  • Loading branch information
nomis52 committed Jun 27, 2016
1 parent e64db45 commit b4fc673
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Conn struct {
server string // remember the address/port of the current server
conn net.Conn
eventChan chan Event
eventCallback EventCallback // may be nil
shouldQuit chan struct{}
pingInterval time.Duration
recvTimeout time.Duration
Expand Down Expand Up @@ -224,6 +225,18 @@ func WithHostProvider(hostProvider HostProvider) connOption {
}
}

// EventCallback is a function that is called when an Event occurs.
type EventCallback func(Event)

// WithEventCallback returns a connection option that specifies an event
// callback.
// The callback must not block - doing so would delay the ZK go routines.
func WithEventCallback(cb EventCallback) connOption {
return func(c *Conn) {
c.eventCallback = cb
}
}

func (c *Conn) Close() {
close(c.shouldQuit)

Expand Down Expand Up @@ -258,8 +271,16 @@ func (c *Conn) setTimeouts(sessionTimeoutMs int32) {

func (c *Conn) setState(state State) {
atomic.StoreInt32((*int32)(&c.state), int32(state))
c.sendEvent(Event{Type: EventSession, State: state, Server: c.Server()})
}

func (c *Conn) sendEvent(evt Event) {
if c.eventCallback != nil {
c.eventCallback(evt)
}

select {
case c.eventChan <- Event{Type: EventSession, State: state, Server: c.Server()}:
case c.eventChan <- evt:
default:
// panic("zk: event channel full - it must be monitored and never allowed to be full")
}
Expand Down Expand Up @@ -611,10 +632,7 @@ func (c *Conn) recvLoop(conn net.Conn) error {
Path: res.Path,
Err: nil,
}
select {
case c.eventChan <- ev:
default:
}
c.sendEvent(ev)
wTypes := make([]watchType, 0, 2)
switch res.Type {
case EventNodeCreated:
Expand Down

0 comments on commit b4fc673

Please sign in to comment.