Skip to content

Commit

Permalink
Adding blocking notify ch for leadership changes
Browse files Browse the repository at this point in the history
  • Loading branch information
armon committed Sep 7, 2015
1 parent 0ab3e94 commit 9dabbba
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ type Config struct {
// never be used except for testing purposes, as it can cause a split-brain.
StartAsLeader bool

// NotifyCh is used to provide a channel that will be notified of leadership
// changes. Raft will block writing to this channel, so it should either be
// buffered or aggressively consumed.
NotifyCh chan<- bool

// LogOutput is used as a sink for logs, unless Logger is specified.
// Defaults to os.Stderr.
LogOutput io.Writer
Expand Down
16 changes: 16 additions & 0 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,14 @@ func (r *Raft) runLeader() {
// Notify that we are the leader
asyncNotifyBool(r.leaderCh, true)

// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
select {
case notify <- true:
case <-r.shutdownCh:
}
}

// Setup leader state
r.leaderState.commitCh = make(chan struct{}, 1)
r.leaderState.inflight = newInflight(r.leaderState.commitCh)
Expand Down Expand Up @@ -773,6 +781,14 @@ func (r *Raft) runLeader() {

// Notify that we are not the leader
asyncNotifyBool(r.leaderCh, false)

// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
select {
case notify <- false:
case <-r.shutdownCh:
}
}
}()

// Start a replication routine for each peer
Expand Down
31 changes: 31 additions & 0 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1553,3 +1553,34 @@ func TestRaft_StartAsLeader(t *testing.T) {
t.Fatalf("did not apply to FSM!")
}
}

func TestRaft_NotifyCh(t *testing.T) {
ch := make(chan bool, 1)
conf := inmemConfig(t)
conf.NotifyCh = ch
c := MakeCluster(1, t, conf)
defer c.Close()

// Watch leaderCh for change
select {
case v := <-ch:
if !v {
t.Fatalf("should become leader")
}
case <-time.After(conf.HeartbeatTimeout * 3):
t.Fatalf("timeout becoming leader")
}

// Close the cluster
c.Close()

// Watch leaderCh for change
select {
case v := <-ch:
if v {
t.Fatalf("should step down as leader")
}
case <-time.After(conf.HeartbeatTimeout * 3):
t.Fatalf("timeout becoming leader")
}
}

0 comments on commit 9dabbba

Please sign in to comment.