Skip to content

Commit

Permalink
Check for shutdown in heartbeat fast-path
Browse files Browse the repository at this point in the history
  • Loading branch information
armon committed Jan 7, 2015
1 parent 64bc2d8 commit 009485a
Showing 1 changed file with 21 additions and 1 deletion.
22 changes: 21 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
// Setup a heartbeat fast-path to avoid head-of-line
// blocking where possible. It MUST be safe for this
// to be called concurrently with a blocking RPC.
trans.SetHeartbeatHandler(r.processRPC)
trans.SetHeartbeatHandler(r.processHeartbeat)

// Start the background work
r.goFunc(r.run)
Expand Down Expand Up @@ -1178,6 +1178,26 @@ func (r *Raft) processRPC(rpc RPC) {
}
}

// processHeartbeat is a special handler used just for heartbeat requests
// so that they can be fast-pathed if a transport supports it
func (r *Raft) processHeartbeat(rpc RPC) {
// Check if we are shutdown, just ignore the RPC
select {
case <-r.shutdownCh:
return
default:
}

// Ensure we are only handling a heartbeat
switch cmd := rpc.Command.(type) {
case *AppendEntriesRequest:
r.appendEntries(rpc, cmd)
default:
r.logger.Printf("[ERR] raft: Expected heartbeat, got command: %#v", rpc.Command)
rpc.Respond(nil, fmt.Errorf("unexpected command"))
}
}

// appendEntries is invoked when we get an append entries RPC call
func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
defer metrics.MeasureSince([]string{"raft", "rpc", "appendEntries"}, time.Now())
Expand Down

0 comments on commit 009485a

Please sign in to comment.