Skip to content

Commit

Permalink
Bug Fix: Avoid Dgraph cluster getting stuck in infinite leader electi…
Browse files Browse the repository at this point in the history
…on (#3391)

Dgraph Alphas were calculating snapshots and checkpoints in the main Raft loop, which depending upon disk speed caused Ticks to not be done for seconds. This caused the followers to assume that the leader is unavailable, triggering an election. Because the checkpoint and snapshot calculation happens every 30s, the election was happening every 30s as well.

This PR moves both of those outside the main loop into their own goroutine (colocated with the code which shuts down Raft node). Tested successfully with a live cluster which was exhibiting these symptoms.

This PR also tracks how many heartbeats have come in and gone out from each node and prints them out under V(3). Useful for debugging.

The PR improves upon and uses x.Timer to track Raft.Ready components' latencies and report them in both Alphas and Zeros. This fixes the incorrect statement we were making about disk latency being the primary cause of Raft.Ready being slow.

Changes:
* Report Heartbeat comms
* Add logs around heartbeats.
* Move snapshot and checkpoint calculation outside of the main Raft loop. Capture the latency of individual components in Raft.Ready better.
* Add timer to Zero as well. Fix a bug: Use a for loop when going over slow ticker.
* Move num pending txns to V(2).
* Move the checkpointing code outside of the Run func.
  • Loading branch information
manishrjain committed May 9, 2019
1 parent a2c01a7 commit c186edd
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 57 deletions.
18 changes: 18 additions & 0 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type Node struct {
// The stages are proposed -> committed (accepted by cluster) ->
// applied (to PL) -> synced (to BadgerDB).
Applied y.WaterMark

heartbeatsOut int64
heartbeatsIn int64
}

type ToGlog struct {
Expand Down Expand Up @@ -155,6 +158,20 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node {
return n
}

func (n *Node) ReportRaftComms() {
if !glog.V(3) {
return
}
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for range ticker.C {
out := atomic.SwapInt64(&n.heartbeatsOut, 0)
in := atomic.SwapInt64(&n.heartbeatsIn, 0)
glog.Infof("RaftComm: [%#x] Heartbeats out: %d, in: %d", n.Id, out, in)
}
}

// SetRaft would set the provided raft.Node to this node.
// It would check fail if the node is already set.
func (n *Node) SetRaft(r raft.Node) {
Expand Down Expand Up @@ -233,6 +250,7 @@ func (n *Node) Send(msg raftpb.Message) {
if glog.V(2) {
switch msg.Type {
case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
atomic.AddInt64(&n.heartbeatsOut, 1)
case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
case raftpb.MsgApp, raftpb.MsgAppResp:
case raftpb.MsgProp:
Expand Down
2 changes: 2 additions & 0 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/binary"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/dgo/protos/api"
Expand Down Expand Up @@ -228,6 +229,7 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {
if glog.V(2) {
switch msg.Type {
case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
atomic.AddInt64(&n.heartbeatsIn, 1)
case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
case raftpb.MsgApp, raftpb.MsgAppResp:
case raftpb.MsgProp:
Expand Down
15 changes: 9 additions & 6 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ func (n *node) Run() {
// We only stop runReadIndexLoop after the for loop below has finished interacting with it.
// That way we know sending to readStateCh will not deadlock.

var timer x.Timer
for {
select {
case <-n.closer.HasBeenClosed():
Expand All @@ -626,7 +627,7 @@ func (n *node) Run() {
case <-ticker.C:
n.Raft().Tick()
case rd := <-n.Raft().Ready():
start := time.Now()
timer.Start()
_, span := otrace.StartSpan(n.ctx, "Zero.RunLoop",
otrace.WithSampler(otrace.ProbabilitySampler(0.001)))
for _, rs := range rd.ReadStates {
Expand Down Expand Up @@ -654,7 +655,7 @@ func (n *node) Run() {
}
n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
span.Annotatef(nil, "Saved to storage")
diskDur := time.Since(start)
timer.Record("disk")

if !raft.IsEmptySnap(rd.Snapshot) {
var state pb.MembershipState
Expand Down Expand Up @@ -689,16 +690,18 @@ func (n *node) Run() {
}
}
span.Annotate(nil, "Sent messages")
timer.Record("proposals")

n.Raft().Advance()
span.Annotate(nil, "Advanced Raft")
timer.Record("advance")

span.End()
if time.Since(start) > 100*time.Millisecond {
if timer.Total() > 100*time.Millisecond {
glog.Warningf(
"Raft.Ready took too long to process: %v. Most likely due to slow disk: %v."+
"Raft.Ready took too long to process: %s."+
" Num entries: %d. MustSync: %v",
time.Since(start).Round(time.Millisecond), diskDur.Round(time.Millisecond),
len(rd.Entries), rd.MustSync)
timer.String(), len(rd.Entries), rd.MustSync)
}
}
}
Expand Down
108 changes: 63 additions & 45 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,10 +672,56 @@ func (n *node) updateRaftProgress() error {
if err := txn.CommitAt(1, nil); err != nil {
return err
}
glog.V(1).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index)
glog.V(2).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index)
return nil
}

func (n *node) checkpointAndClose(done chan struct{}) {
slowTicker := time.NewTicker(time.Minute)
defer slowTicker.Stop()

for {
select {
case <-slowTicker.C:
// Do these operations asynchronously away from the main Run loop to allow heartbeats to
// be sent on time. Otherwise, followers would just keep running elections.

n.elog.Printf("Size of applyCh: %d", len(n.applyCh))
if err := n.updateRaftProgress(); err != nil {
glog.Errorf("While updating Raft progress: %v", err)
}

if n.AmLeader() {
// We keep track of the applied index in the p directory. Even if we don't take
// snapshot for a while and let the Raft logs grow and restart, we would not have to
// run all the log entries, because we can tell Raft.Config to set Applied to that
// index.
// This applied index tracking also covers the case when we have a big index
// rebuild. The rebuild would be tracked just like others and would not need to be
// replayed after a restart, because the Applied config would let us skip right
// through it.
// We use disk based storage for Raft. So, we're not too concerned about
// snapshotting. We just need to do enough, so that we don't have a huge backlog of
// entries to process on a restart.
if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil {
x.Errorf("While calculating and proposing snapshot: %v", err)
}
go n.abortOldTransactions()
}

case <-n.closer.HasBeenClosed():
glog.Infof("Stopping node.Run")
if peerId, has := groups().MyPeer(); has && n.AmLeader() {
n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId)
time.Sleep(time.Second) // Let transfer happen.
}
n.Raft().Stop()
close(done)
return
}
}
}

func (n *node) Run() {
defer n.closer.Done() // CLOSER:1

Expand All @@ -685,20 +731,9 @@ func (n *node) Run() {
ticker := time.NewTicker(20 * time.Millisecond)
defer ticker.Stop()

slowTicker := time.NewTicker(30 * time.Second)
defer slowTicker.Stop()

done := make(chan struct{})
go func() {
<-n.closer.HasBeenClosed()
glog.Infof("Stopping node.Run")
if peerId, has := groups().MyPeer(); has && n.AmLeader() {
n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId)
time.Sleep(time.Second) // Let transfer happen.
}
n.Raft().Stop()
close(done)
}()
go n.checkpointAndClose(done)
go n.ReportRaftComms()

applied, err := n.findRaftProgress()
if err != nil {
Expand All @@ -707,6 +742,7 @@ func (n *node) Run() {
glog.Infof("Found Raft progress in p directory: %d", applied)
}

var timer x.Timer
for {
select {
case <-done:
Expand All @@ -717,35 +753,15 @@ func (n *node) Run() {
glog.Infoln("Raft node done.")
return

case <-slowTicker.C:
n.elog.Printf("Size of applyCh: %d", len(n.applyCh))
if err := n.updateRaftProgress(); err != nil {
glog.Errorf("While updating Raft progress: %v", err)
}

if leader {
// We keep track of the applied index in the p directory. Even if we don't take
// snapshot for a while and let the Raft logs grow and restart, we would not have to
// run all the log entries, because we can tell Raft.Config to set Applied to that
// index.
// This applied index tracking also covers the case when we have a big index
// rebuild. The rebuild would be tracked just like others and would not need to be
// replayed after a restart, because the Applied config would let us skip right
// through it.
// We use disk based storage for Raft. So, we're not too concerned about
// snapshotting. We just need to do enough, so that we don't have a huge backlog of
// entries to process on a restart.
if err := n.proposeSnapshot(Config.SnapshotAfter); err != nil {
x.Errorf("While calculating and proposing snapshot: %v", err)
}
go n.abortOldTransactions()
}

// Slow ticker can't be placed here because figuring out checkpoints and snapshots takes
// time and if the leader does not send heartbeats out during this time, the followers
// start an election process. And that election process would just continue to happen
// indefinitely because checkpoints and snapshots are being calculated indefinitely.
case <-ticker.C:
n.Raft().Tick()

case rd := <-n.Raft().Ready():
start := time.Now()
timer.Start()
_, span := otrace.StartSpan(n.ctx, "Alpha.RunLoop",
otrace.WithSampler(otrace.ProbabilitySampler(0.001)))

Expand Down Expand Up @@ -812,13 +828,13 @@ func (n *node) Run() {

// Store the hardstate and entries. Note that these are not CommittedEntries.
n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
diskDur := time.Since(start)
if span != nil {
span.Annotatef(nil, "Saved %d entries. Snapshot, HardState empty? (%v, %v)",
len(rd.Entries),
raft.IsEmptySnap(rd.Snapshot),
raft.IsEmptyHardState(rd.HardState))
}
timer.Record("disk")

// Now schedule or apply committed entries.
var proposals []*pb.Proposal
Expand Down Expand Up @@ -890,8 +906,11 @@ func (n *node) Run() {
if span != nil {
span.Annotate(nil, "Followed queued messages.")
}
timer.Record("proposals")

n.Raft().Advance()
timer.Record("advance")

if firstRun && n.canCampaign {
go n.Raft().Campaign(n.ctx)
firstRun = false
Expand All @@ -900,12 +919,11 @@ func (n *node) Run() {
span.Annotate(nil, "Advanced Raft. Done.")
span.End()
}
if time.Since(start) > 100*time.Millisecond {
if timer.Total() > 100*time.Millisecond {
glog.Warningf(
"Raft.Ready took too long to process: %v. Most likely due to slow disk: %v."+
"Raft.Ready took too long to process: %s"+
" Num entries: %d. MustSync: %v",
time.Since(start).Round(time.Millisecond), diskDur.Round(time.Millisecond),
len(rd.Entries), rd.MustSync)
timer.String(), len(rd.Entries), rd.MustSync)
}
}
}
Expand Down Expand Up @@ -1141,7 +1159,7 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
}

if num := posting.Oracle().NumPendingTxns(); num > 0 {
glog.Infof("Num pending txns: %d", num)
glog.V(2).Infof("Num pending txns: %d", num)
}
// We can't rely upon the Raft entries to determine the minPendingStart,
// because there are many cases during mutations where we don't commit or
Expand Down
22 changes: 16 additions & 6 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,14 @@ func (b *BytesBuffer) TruncateBy(n int) {
AssertTrue(b.off >= 0 && b.sz >= 0)
}

type record struct {
Name string
Dur time.Duration
}
type Timer struct {
start time.Time
last time.Time
records []time.Duration
records []record
}

func (t *Timer) Start() {
Expand All @@ -390,18 +394,24 @@ func (t *Timer) Start() {
t.records = t.records[:0]
}

func (t *Timer) Record() {
func (t *Timer) Record(name string) {
now := time.Now()
t.records = append(t.records, now.Sub(t.last))
t.records = append(t.records, record{
Name: name,
Dur: now.Sub(t.last).Round(time.Millisecond),
})
t.last = now
}

func (t *Timer) Total() time.Duration {
return time.Since(t.start)
return time.Since(t.start).Round(time.Millisecond)
}

func (t *Timer) All() []time.Duration {
return t.records
func (t *Timer) String() string {
sort.Slice(t.records, func(i, j int) bool {
return t.records[i].Dur > t.records[j].Dur
})
return fmt.Sprintf("Timer Total: %s. Breakdown: %v", t.Total(), t.records)
}

// PredicateLang extracts the language from a predicate (or facet) name.
Expand Down

0 comments on commit c186edd

Please sign in to comment.