Skip to content

Commit

Permalink
Drain apply channel when a snapshot is received (#4273)
Browse files Browse the repository at this point in the history
* Drain apply channel when a snapshot is received; set node to unhealthy status while snapshot is applied. Fixes DGRAPH-671 and DGRAPH-672

* remove whitespace

* fix issue reported by deepsource

* gofmt
  • Loading branch information
parasssh authored Nov 14, 2019
1 parent 717710a commit 8c0b073
Showing 1 changed file with 31 additions and 8 deletions.
39 changes: 31 additions & 8 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,21 @@ func (n *node) checkpointAndClose(done chan struct{}) {
}
}

func (n *node) drainApplyChan() {
for {
select {
case proposals := <-n.applyCh:
glog.Infof("Draining %d proposals\n", len(proposals))
for _, proposal := range proposals {
n.Proposals.Done(proposal.Key, nil)
n.Applied.Done(proposal.Index)
}
default:
return
}
}
}

func (n *node) Run() {
defer n.closer.Done() // CLOSER:1

Expand Down Expand Up @@ -814,12 +829,19 @@ func (n *node) Run() {
rc := snap.GetContext()
x.AssertTrue(rc.GetGroup() == n.gid)
if rc.Id != n.Id {
// Set node to unhealthy state here while it applies the snapshot.
x.UpdateHealthStatus(false)

// We are getting a new snapshot from leader. We need to wait for the applyCh to
// finish applying the updates, otherwise, we'll end up overwriting the data
// from the new snapshot that we retrieved.

// Drain the apply channel. Snapshot will be retrieved next.
maxIndex := n.Applied.LastIndex()
glog.Infof("Waiting for applyCh to become empty by reaching %d before"+
glog.Infof("Drain applyCh by reaching %d before"+
" retrieving snapshot\n", maxIndex)
n.drainApplyChan()

if err := n.Applied.WaitForMark(context.Background(), maxIndex); err != nil {
glog.Errorf("Error waiting for mark for index %d: %+v", maxIndex, err)
}
Expand Down Expand Up @@ -847,6 +869,9 @@ func (n *node) Run() {
time.Sleep(100 * time.Millisecond) // Wait for a bit.
}
glog.Infof("---> SNAPSHOT: %+v. Group %d. DONE.\n", snap, n.gid)

// Set node to healthy state here.
x.UpdateHealthStatus(true)
} else {
glog.Infof("---> SNAPSHOT: %+v. Group %d from node id %#x [SELF]. Ignoring.\n",
snap, n.gid, rc.Id)
Expand Down Expand Up @@ -883,21 +908,19 @@ func (n *node) Run() {
// possible sequentially
n.Applied.Begin(entry.Index)

if entry.Type == raftpb.EntryConfChange {
switch {
case entry.Type == raftpb.EntryConfChange:
n.applyConfChange(entry)
// Not present in proposal map.
n.Applied.Done(entry.Index)
groups().triggerMembershipSync()

} else if len(entry.Data) == 0 {
case len(entry.Data) == 0:
n.elog.Printf("Found empty data at index: %d", entry.Index)
n.Applied.Done(entry.Index)

} else if entry.Index < applied {
case entry.Index < applied:
n.elog.Printf("Skipping over already applied entry: %d", entry.Index)
n.Applied.Done(entry.Index)

} else {
default:
proposal := &pb.Proposal{}
if err := proposal.Unmarshal(entry.Data); err != nil {
x.Fatalf("Unable to unmarshal proposal: %v %q\n", err, entry.Data)
Expand Down

0 comments on commit 8c0b073

Please sign in to comment.