From c186edd77524e4c004ede4161cdf14c7d984294b Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 8 May 2019 19:44:43 -0700 Subject: [PATCH] Bug Fix: Avoid Dgraph cluster getting stuck in infinite leader election (#3391) Dgraph Alphas were calculating snapshots and checkpoints in the main Raft loop, which depending upon disk speed caused Ticks to not be done for seconds. This caused the followers to assume that the leader is unavailable, triggering an election. Because the checkpoint and snapshot calculation happens every 30s, the election was happening every 30s as well. This PR moves both of those outside the main loop into their own goroutine (colocated with the code which shuts down Raft node). Tested successfully with a live cluster which was exhibiting these symptoms. This PR also tracks how many heartbeats have come in and gone out from each node and prints them out under V(3). Useful for debugging. The PR improves upon and uses x.Timer to track Raft.Ready components' latencies and report them in both Alphas and Zeros. This fixes the incorrect statement we were making about disk latency being the primary cause of Raft.Ready being slow. Changes: * Report Heartbeat comms * Add logs around heartbeats. * Move snapshot and checkpoint calculation outside of the main Raft loop. Capture the latency of individual components in Raft.Ready better. * Add timer to Zero as well. Fix a bug: Use a for loop when going over slow ticker. * Move num pending txns to V(2). * Move the checkpointing code outside of the Run func. --- conn/node.go | 18 +++++++ conn/raft_server.go | 2 + dgraph/cmd/zero/raft.go | 15 +++--- worker/draft.go | 108 +++++++++++++++++++++++----------------- x/x.go | 22 +++++--- 5 files changed, 108 insertions(+), 57 deletions(-) diff --git a/conn/node.go b/conn/node.go index a33b9236d81..2d286fefc2d 100644 --- a/conn/node.go +++ b/conn/node.go @@ -72,6 +72,9 @@ type Node struct { // The stages are proposed -> committed (accepted by cluster) -> // applied (to PL) -> synced (to BadgerDB). Applied y.WaterMark + + heartbeatsOut int64 + heartbeatsIn int64 } type ToGlog struct { @@ -155,6 +158,20 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node { return n } +func (n *Node) ReportRaftComms() { + if !glog.V(3) { + return + } + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for range ticker.C { + out := atomic.SwapInt64(&n.heartbeatsOut, 0) + in := atomic.SwapInt64(&n.heartbeatsIn, 0) + glog.Infof("RaftComm: [%#x] Heartbeats out: %d, in: %d", n.Id, out, in) + } +} + // SetRaft would set the provided raft.Node to this node. // It would check fail if the node is already set. func (n *Node) SetRaft(r raft.Node) { @@ -233,6 +250,7 @@ func (n *Node) Send(msg raftpb.Message) { if glog.V(2) { switch msg.Type { case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: + atomic.AddInt64(&n.heartbeatsOut, 1) case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp: case raftpb.MsgApp, raftpb.MsgAppResp: case raftpb.MsgProp: diff --git a/conn/raft_server.go b/conn/raft_server.go index 03a04077d4c..06d6f08c3fd 100644 --- a/conn/raft_server.go +++ b/conn/raft_server.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "math/rand" "sync" + "sync/atomic" "time" "github.com/dgraph-io/dgo/protos/api" @@ -228,6 +229,7 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error { if glog.V(2) { switch msg.Type { case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: + atomic.AddInt64(&n.heartbeatsIn, 1) case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp: case raftpb.MsgApp, raftpb.MsgAppResp: case raftpb.MsgProp: diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 169e1497ec0..a5ad758acf5 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -618,6 +618,7 @@ func (n *node) Run() { // We only stop runReadIndexLoop after the for loop below has finished interacting with it. // That way we know sending to readStateCh will not deadlock. + var timer x.Timer for { select { case <-n.closer.HasBeenClosed(): @@ -626,7 +627,7 @@ func (n *node) Run() { case <-ticker.C: n.Raft().Tick() case rd := <-n.Raft().Ready(): - start := time.Now() + timer.Start() _, span := otrace.StartSpan(n.ctx, "Zero.RunLoop", otrace.WithSampler(otrace.ProbabilitySampler(0.001))) for _, rs := range rd.ReadStates { @@ -654,7 +655,7 @@ func (n *node) Run() { } n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot) span.Annotatef(nil, "Saved to storage") - diskDur := time.Since(start) + timer.Record("disk") if !raft.IsEmptySnap(rd.Snapshot) { var state pb.MembershipState @@ -689,16 +690,18 @@ func (n *node) Run() { } } span.Annotate(nil, "Sent messages") + timer.Record("proposals") n.Raft().Advance() span.Annotate(nil, "Advanced Raft") + timer.Record("advance") + span.End() - if time.Since(start) > 100*time.Millisecond { + if timer.Total() > 100*time.Millisecond { glog.Warningf( - "Raft.Ready took too long to process: %v. Most likely due to slow disk: %v."+ + "Raft.Ready took too long to process: %s."+ " Num entries: %d. MustSync: %v", - time.Since(start).Round(time.Millisecond), diskDur.Round(time.Millisecond), - len(rd.Entries), rd.MustSync) + timer.String(), len(rd.Entries), rd.MustSync) } } } diff --git a/worker/draft.go b/worker/draft.go index faab1044843..d531781ddf3 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -672,10 +672,56 @@ func (n *node) updateRaftProgress() error { if err := txn.CommitAt(1, nil); err != nil { return err } - glog.V(1).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index) + glog.V(2).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index) return nil } +func (n *node) checkpointAndClose(done chan struct{}) { + slowTicker := time.NewTicker(time.Minute) + defer slowTicker.Stop() + + for { + select { + case <-slowTicker.C: + // Do these operations asynchronously away from the main Run loop to allow heartbeats to + // be sent on time. Otherwise, followers would just keep running elections. + + n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) + if err := n.updateRaftProgress(); err != nil { + glog.Errorf("While updating Raft progress: %v", err) + } + + if n.AmLeader() { + // We keep track of the applied index in the p directory. Even if we don't take + // snapshot for a while and let the Raft logs grow and restart, we would not have to + // run all the log entries, because we can tell Raft.Config to set Applied to that + // index. + // This applied index tracking also covers the case when we have a big index + // rebuild. The rebuild would be tracked just like others and would not need to be + // replayed after a restart, because the Applied config would let us skip right + // through it. + // We use disk based storage for Raft. So, we're not too concerned about + // snapshotting. We just need to do enough, so that we don't have a huge backlog of + // entries to process on a restart. + if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil { + x.Errorf("While calculating and proposing snapshot: %v", err) + } + go n.abortOldTransactions() + } + + case <-n.closer.HasBeenClosed(): + glog.Infof("Stopping node.Run") + if peerId, has := groups().MyPeer(); has && n.AmLeader() { + n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId) + time.Sleep(time.Second) // Let transfer happen. + } + n.Raft().Stop() + close(done) + return + } + } +} + func (n *node) Run() { defer n.closer.Done() // CLOSER:1 @@ -685,20 +731,9 @@ func (n *node) Run() { ticker := time.NewTicker(20 * time.Millisecond) defer ticker.Stop() - slowTicker := time.NewTicker(30 * time.Second) - defer slowTicker.Stop() - done := make(chan struct{}) - go func() { - <-n.closer.HasBeenClosed() - glog.Infof("Stopping node.Run") - if peerId, has := groups().MyPeer(); has && n.AmLeader() { - n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId) - time.Sleep(time.Second) // Let transfer happen. - } - n.Raft().Stop() - close(done) - }() + go n.checkpointAndClose(done) + go n.ReportRaftComms() applied, err := n.findRaftProgress() if err != nil { @@ -707,6 +742,7 @@ func (n *node) Run() { glog.Infof("Found Raft progress in p directory: %d", applied) } + var timer x.Timer for { select { case <-done: @@ -717,35 +753,15 @@ func (n *node) Run() { glog.Infoln("Raft node done.") return - case <-slowTicker.C: - n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) - if err := n.updateRaftProgress(); err != nil { - glog.Errorf("While updating Raft progress: %v", err) - } - - if leader { - // We keep track of the applied index in the p directory. Even if we don't take - // snapshot for a while and let the Raft logs grow and restart, we would not have to - // run all the log entries, because we can tell Raft.Config to set Applied to that - // index. - // This applied index tracking also covers the case when we have a big index - // rebuild. The rebuild would be tracked just like others and would not need to be - // replayed after a restart, because the Applied config would let us skip right - // through it. - // We use disk based storage for Raft. So, we're not too concerned about - // snapshotting. We just need to do enough, so that we don't have a huge backlog of - // entries to process on a restart. - if err := n.proposeSnapshot(Config.SnapshotAfter); err != nil { - x.Errorf("While calculating and proposing snapshot: %v", err) - } - go n.abortOldTransactions() - } - + // Slow ticker can't be placed here because figuring out checkpoints and snapshots takes + // time and if the leader does not send heartbeats out during this time, the followers + // start an election process. And that election process would just continue to happen + // indefinitely because checkpoints and snapshots are being calculated indefinitely. case <-ticker.C: n.Raft().Tick() case rd := <-n.Raft().Ready(): - start := time.Now() + timer.Start() _, span := otrace.StartSpan(n.ctx, "Alpha.RunLoop", otrace.WithSampler(otrace.ProbabilitySampler(0.001))) @@ -812,13 +828,13 @@ func (n *node) Run() { // Store the hardstate and entries. Note that these are not CommittedEntries. n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot) - diskDur := time.Since(start) if span != nil { span.Annotatef(nil, "Saved %d entries. Snapshot, HardState empty? (%v, %v)", len(rd.Entries), raft.IsEmptySnap(rd.Snapshot), raft.IsEmptyHardState(rd.HardState)) } + timer.Record("disk") // Now schedule or apply committed entries. var proposals []*pb.Proposal @@ -890,8 +906,11 @@ func (n *node) Run() { if span != nil { span.Annotate(nil, "Followed queued messages.") } + timer.Record("proposals") n.Raft().Advance() + timer.Record("advance") + if firstRun && n.canCampaign { go n.Raft().Campaign(n.ctx) firstRun = false @@ -900,12 +919,11 @@ func (n *node) Run() { span.Annotate(nil, "Advanced Raft. Done.") span.End() } - if time.Since(start) > 100*time.Millisecond { + if timer.Total() > 100*time.Millisecond { glog.Warningf( - "Raft.Ready took too long to process: %v. Most likely due to slow disk: %v."+ + "Raft.Ready took too long to process: %s"+ " Num entries: %d. MustSync: %v", - time.Since(start).Round(time.Millisecond), diskDur.Round(time.Millisecond), - len(rd.Entries), rd.MustSync) + timer.String(), len(rd.Entries), rd.MustSync) } } } @@ -1141,7 +1159,7 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) { } if num := posting.Oracle().NumPendingTxns(); num > 0 { - glog.Infof("Num pending txns: %d", num) + glog.V(2).Infof("Num pending txns: %d", num) } // We can't rely upon the Raft entries to determine the minPendingStart, // because there are many cases during mutations where we don't commit or diff --git a/x/x.go b/x/x.go index f95938d80fd..a472572a19e 100644 --- a/x/x.go +++ b/x/x.go @@ -378,10 +378,14 @@ func (b *BytesBuffer) TruncateBy(n int) { AssertTrue(b.off >= 0 && b.sz >= 0) } +type record struct { + Name string + Dur time.Duration +} type Timer struct { start time.Time last time.Time - records []time.Duration + records []record } func (t *Timer) Start() { @@ -390,18 +394,24 @@ func (t *Timer) Start() { t.records = t.records[:0] } -func (t *Timer) Record() { +func (t *Timer) Record(name string) { now := time.Now() - t.records = append(t.records, now.Sub(t.last)) + t.records = append(t.records, record{ + Name: name, + Dur: now.Sub(t.last).Round(time.Millisecond), + }) t.last = now } func (t *Timer) Total() time.Duration { - return time.Since(t.start) + return time.Since(t.start).Round(time.Millisecond) } -func (t *Timer) All() []time.Duration { - return t.records +func (t *Timer) String() string { + sort.Slice(t.records, func(i, j int) bool { + return t.records[i].Dur > t.records[j].Dur + }) + return fmt.Sprintf("Timer Total: %s. Breakdown: %v", t.Total(), t.records) } // PredicateLang extracts the language from a predicate (or facet) name.