Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug Fix: Avoid Dgraph cluster getting stuck in infinite leader election #3391

Merged
merged 6 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type Node struct {
// The stages are proposed -> committed (accepted by cluster) ->
// applied (to PL) -> synced (to BadgerDB).
Applied y.WaterMark

heartbeatsOut int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

atomic package issues here? This is 64 bit. https://golang.org/pkg/sync/atomic/#pkg-note-BUG

heartbeatsIn int64
}

type ToGlog struct {
Expand Down Expand Up @@ -155,6 +158,20 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node {
return n
}

func (n *Node) ReportRaftComms() {
if !glog.V(3) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this work for level > 3 too?

return
}
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for range ticker.C {
out := atomic.SwapInt64(&n.heartbeatsOut, 0)
in := atomic.SwapInt64(&n.heartbeatsIn, 0)
glog.Infof("RaftComm: [%#x] Heartbeats out: %d, in: %d", n.Id, out, in)
}
}

// SetRaft would set the provided raft.Node to this node.
// It would check fail if the node is already set.
func (n *Node) SetRaft(r raft.Node) {
Expand Down Expand Up @@ -233,6 +250,7 @@ func (n *Node) Send(msg raftpb.Message) {
if glog.V(2) {
switch msg.Type {
case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
atomic.AddInt64(&n.heartbeatsOut, 1)
case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
case raftpb.MsgApp, raftpb.MsgAppResp:
case raftpb.MsgProp:
Expand Down
2 changes: 2 additions & 0 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/binary"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/dgo/protos/api"
Expand Down Expand Up @@ -228,6 +229,7 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {
if glog.V(2) {
switch msg.Type {
case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
atomic.AddInt64(&n.heartbeatsIn, 1)
case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
case raftpb.MsgApp, raftpb.MsgAppResp:
case raftpb.MsgProp:
Expand Down
15 changes: 9 additions & 6 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ func (n *node) Run() {
// We only stop runReadIndexLoop after the for loop below has finished interacting with it.
// That way we know sending to readStateCh will not deadlock.

var timer x.Timer
for {
select {
case <-n.closer.HasBeenClosed():
Expand All @@ -626,7 +627,7 @@ func (n *node) Run() {
case <-ticker.C:
n.Raft().Tick()
case rd := <-n.Raft().Ready():
start := time.Now()
timer.Start()
_, span := otrace.StartSpan(n.ctx, "Zero.RunLoop",
otrace.WithSampler(otrace.ProbabilitySampler(0.001)))
for _, rs := range rd.ReadStates {
Expand Down Expand Up @@ -654,7 +655,7 @@ func (n *node) Run() {
}
n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
span.Annotatef(nil, "Saved to storage")
diskDur := time.Since(start)
timer.Record("disk")

if !raft.IsEmptySnap(rd.Snapshot) {
var state pb.MembershipState
Expand Down Expand Up @@ -689,16 +690,18 @@ func (n *node) Run() {
}
}
span.Annotate(nil, "Sent messages")
timer.Record("proposals")

n.Raft().Advance()
span.Annotate(nil, "Advanced Raft")
timer.Record("advance")

span.End()
if time.Since(start) > 100*time.Millisecond {
if timer.Total() > 100*time.Millisecond {
glog.Warningf(
"Raft.Ready took too long to process: %v. Most likely due to slow disk: %v."+
"Raft.Ready took too long to process: %s."+
" Num entries: %d. MustSync: %v",
time.Since(start).Round(time.Millisecond), diskDur.Round(time.Millisecond),
len(rd.Entries), rd.MustSync)
timer.String(), len(rd.Entries), rd.MustSync)
}
}
}
Expand Down
110 changes: 64 additions & 46 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,10 +724,56 @@ func (n *node) updateRaftProgress() error {
if err := txn.CommitAt(1, nil); err != nil {
return err
}
glog.V(1).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index)
glog.V(2).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index)
return nil
}

func (n *node) checkpointAndClose(done chan struct{}) {
slowTicker := time.NewTicker(time.Minute)
defer slowTicker.Stop()

for {
select {
case <-slowTicker.C:
// Do these operations asynchronously away from the main Run loop to allow heartbeats to
// be sent on time. Otherwise, followers would just keep running elections.

n.elog.Printf("Size of applyCh: %d", len(n.applyCh))
if err := n.updateRaftProgress(); err != nil {
glog.Errorf("While updating Raft progress: %v", err)
}

if n.AmLeader() {
// We keep track of the applied index in the p directory. Even if we don't take
// snapshot for a while and let the Raft logs grow and restart, we would not have to
// run all the log entries, because we can tell Raft.Config to set Applied to that
// index.
// This applied index tracking also covers the case when we have a big index
// rebuild. The rebuild would be tracked just like others and would not need to be
// replayed after a restart, because the Applied config would let us skip right
// through it.
// We use disk based storage for Raft. So, we're not too concerned about
// snapshotting. We just need to do enough, so that we don't have a huge backlog of
// entries to process on a restart.
if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil {
x.Errorf("While calculating and proposing snapshot: %v", err)
}
go n.abortOldTransactions()
}

case <-n.closer.HasBeenClosed():
glog.Infof("Stopping node.Run")
if peerId, has := groups().MyPeer(); has && n.AmLeader() {
n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId)
time.Sleep(time.Second) // Let transfer happen.
}
n.Raft().Stop()
close(done)
return
}
}
}

func (n *node) Run() {
defer n.closer.Done() // CLOSER:1

Expand All @@ -737,20 +783,9 @@ func (n *node) Run() {
ticker := time.NewTicker(20 * time.Millisecond)
defer ticker.Stop()

slowTicker := time.NewTicker(30 * time.Second)
defer slowTicker.Stop()

done := make(chan struct{})
go func() {
<-n.closer.HasBeenClosed()
glog.Infof("Stopping node.Run")
if peerId, has := groups().MyPeer(); has && n.AmLeader() {
n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId)
time.Sleep(time.Second) // Let transfer happen.
}
n.Raft().Stop()
close(done)
}()
go n.checkpointAndClose(done)
go n.ReportRaftComms()

applied, err := n.findRaftProgress()
if err != nil {
Expand All @@ -759,6 +794,7 @@ func (n *node) Run() {
glog.Infof("Found Raft progress in p directory: %d", applied)
}

var timer x.Timer
for {
select {
case <-done:
Expand All @@ -769,35 +805,15 @@ func (n *node) Run() {
glog.Infoln("Raft node done.")
return

case <-slowTicker.C:
n.elog.Printf("Size of applyCh: %d", len(n.applyCh))
if err := n.updateRaftProgress(); err != nil {
glog.Errorf("While updating Raft progress: %v", err)
}

if leader {
// We keep track of the applied index in the p directory. Even if we don't take
// snapshot for a while and let the Raft logs grow and restart, we would not have to
// run all the log entries, because we can tell Raft.Config to set Applied to that
// index.
// This applied index tracking also covers the case when we have a big index
// rebuild. The rebuild would be tracked just like others and would not need to be
// replayed after a restart, because the Applied config would let us skip right
// through it.
// We use disk based storage for Raft. So, we're not too concerned about
// snapshotting. We just need to do enough, so that we don't have a huge backlog of
// entries to process on a restart.
if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil {
x.Errorf("While calculating and proposing snapshot: %v", err)
}
go n.abortOldTransactions()
}

// Slow ticker can't be placed here because figuring out checkpoints and snapshots takes
// time and if the leader does not send heartbeats out during this time, the followers
// start an election process. And that election process would just continue to happen
// indefinitely because checkpoints and snapshots are being calculated indefinitely.
case <-ticker.C:
n.Raft().Tick()

case rd := <-n.Raft().Ready():
start := time.Now()
timer.Start()
_, span := otrace.StartSpan(n.ctx, "Alpha.RunLoop",
otrace.WithSampler(otrace.ProbabilitySampler(0.001)))

Expand Down Expand Up @@ -875,13 +891,13 @@ func (n *node) Run() {

// Store the hardstate and entries. Note that these are not CommittedEntries.
n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
diskDur := time.Since(start)
if span != nil {
span.Annotatef(nil, "Saved %d entries. Snapshot, HardState empty? (%v, %v)",
len(rd.Entries),
raft.IsEmptySnap(rd.Snapshot),
raft.IsEmptyHardState(rd.HardState))
}
timer.Record("disk")

// Now schedule or apply committed entries.
var proposals []*pb.Proposal
Expand Down Expand Up @@ -953,8 +969,11 @@ func (n *node) Run() {
if span != nil {
span.Annotate(nil, "Followed queued messages.")
}
timer.Record("proposals")

n.Raft().Advance()
timer.Record("advance")

if firstRun && n.canCampaign {
go n.Raft().Campaign(n.ctx)
firstRun = false
Expand All @@ -964,14 +983,13 @@ func (n *node) Run() {
span.End()
ostats.RecordWithTags(context.Background(),
[]tag.Mutator{tag.Upsert(x.KeyMethod, "alpha.RunLoop")},
x.LatencyMs.M(x.SinceMs(start)))
x.LatencyMs.M(float64(timer.Total())/1e6))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could divide by time.MilliSecond instead here

}
if time.Since(start) > 100*time.Millisecond {
if timer.Total() > 100*time.Millisecond {
glog.Warningf(
"Raft.Ready took too long to process: %v. Most likely due to slow disk: %v."+
"Raft.Ready took too long to process: %s"+
" Num entries: %d. MustSync: %v",
time.Since(start).Round(time.Millisecond), diskDur.Round(time.Millisecond),
len(rd.Entries), rd.MustSync)
timer.String(), len(rd.Entries), rd.MustSync)
}
}
}
Expand Down Expand Up @@ -1214,7 +1232,7 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
}

if num := posting.Oracle().NumPendingTxns(); num > 0 {
glog.Infof("Num pending txns: %d", num)
glog.V(2).Infof("Num pending txns: %d", num)
}
// We can't rely upon the Raft entries to determine the minPendingStart,
// because there are many cases during mutations where we don't commit or
Expand Down
22 changes: 16 additions & 6 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,14 @@ func (b *BytesBuffer) TruncateBy(n int) {
AssertTrue(b.off >= 0 && b.sz >= 0)
}

type record struct {
Name string
Dur time.Duration
}
type Timer struct {
start time.Time
last time.Time
records []time.Duration
records []record
}

func (t *Timer) Start() {
Expand All @@ -391,18 +395,24 @@ func (t *Timer) Start() {
t.records = t.records[:0]
}

func (t *Timer) Record() {
func (t *Timer) Record(name string) {
now := time.Now()
t.records = append(t.records, now.Sub(t.last))
t.records = append(t.records, record{
Name: name,
Dur: now.Sub(t.last).Round(time.Millisecond),
})
t.last = now
}

func (t *Timer) Total() time.Duration {
return time.Since(t.start)
return time.Since(t.start).Round(time.Millisecond)
}

func (t *Timer) All() []time.Duration {
return t.records
func (t *Timer) String() string {
sort.Slice(t.records, func(i, j int) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is sorting helpful in viewing the results?

return t.records[i].Dur > t.records[j].Dur
})
return fmt.Sprintf("Timer Total: %s. Breakdown: %v", t.Total(), t.records)
}

// PredicateLang extracts the language from a predicate (or facet) name.
Expand Down