Skip to content

Commit

Permalink
fix(store): raft commit ahead of stable entries (#325)
Browse files Browse the repository at this point in the history
  • Loading branch information
ifplusor authored Dec 9, 2022
1 parent 2fa1b52 commit 3abf12a
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 41 deletions.
5 changes: 3 additions & 2 deletions raft/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
//
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
// NOTE: These entries are still unstable.
//
// TODO(james.yin): Set localCommitted?
rn.raft.raftLog.committed = uint64(len(ents))
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
Expand Down
88 changes: 66 additions & 22 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ type raftLog struct {
pending uint64
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
// Invariant: committed < unstable.offset + len(unstable.entries)
committed uint64
// Invariant: localCommitted = min(committed, unstable.offset)
localCommitted uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// Invariant: applied <= committed
// Invariant: applied <= localCommitted
applied uint64
// compacted is the highest log position that the application can
// delete safety.
Expand Down Expand Up @@ -80,6 +83,7 @@ func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raf
log.pending = lastIndex + 1
// Initialize our committed and applied pointers to the time of the last compaction.
log.committed = firstIndex - 1
log.localCommitted = firstIndex - 1
log.applied = firstIndex - 1
log.compacted = firstIndex - 1

Expand All @@ -97,6 +101,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry
return false
}

li := index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
Expand All @@ -109,11 +114,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry
}
l.append(ents[ci-offset:]...)
}

// FIXME: use stable
// li := l.lastIndex()
// l.commitTo(min(committed, li))

l.commitTo(min(committed, li))
return true
}

Expand Down Expand Up @@ -204,9 +205,9 @@ func (l *raftLog) pendingEntries() []pb.Entry {
// If applied is smaller than the index of snapshot, it returns all committed
// entries after the index of snapshot.
func (l *raftLog) nextEnts() (ents []pb.Entry) {
off := max(l.applied+1, l.firstIndex())
if l.committed+1 > off {
ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize)
lo := max(l.applied+1, l.firstIndex())
if hi := l.localCommitted + 1; hi > lo {
ents, err := l.slice(lo, hi, l.maxNextEntsSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
Expand All @@ -219,7 +220,7 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) {
// is a fast check without heavy raftLog.slice() in raftLog.nextEnts().
func (l *raftLog) hasNextEnts() bool {
off := max(l.applied+1, l.firstIndex())
return l.committed+1 > off
return l.localCommitted+1 > off
}

// hasPendingSnapshot returns if there is pending snapshot waiting for applying.
Expand Down Expand Up @@ -256,14 +257,12 @@ func (l *raftLog) lastIndex() uint64 {
return i
}

func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {
if l.lastIndex() < tocommit {
l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
}
l.committed = tocommit
func (l *raftLog) stableLastIndex() uint64 {
i, err := l.storage.LastIndex()
if err != nil {
panic(err) // TODO(james.yin)
}
return i
}

func (l *raftLog) compactTo(tocompact uint64) {
Expand All @@ -280,12 +279,36 @@ func (l *raftLog) appliedTo(i uint64) {
if i == 0 {
return
}
if l.committed < i || i < l.applied {
l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
if l.localCommitted < i || i < l.applied {
l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), localCommitted(%d)]", i, l.applied, l.localCommitted)
}
l.applied = i
}

func (l *raftLog) localCommitTo(tocommit uint64) {
if tocommit >= l.unstable.offset {
tocommit = l.unstable.offset
}
// never decrease commit
if l.localCommitted < tocommit {
// if li := l.stableLastIndex(); li < tocommit {
// l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, li)
// }
l.localCommitted = tocommit
}
}

func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {
// if li := l.lastIndex(); li < tocommit {
// l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, li)
// }
l.committed = tocommit
}
l.localCommitTo(tocommit)
}

func (l *raftLog) persistingTo(i, t uint64) {
gt, ok := l.unstable.maybeTerm(i)
if !ok {
Expand Down Expand Up @@ -338,6 +361,24 @@ func (l *raftLog) term(i uint64) (uint64, error) {
panic(err) // TODO(bdarnell)
}

func (l *raftLog) stableTerm(i uint64) (uint64, error) {
// the valid term range is [index of dummy entry, last index]
dummyIndex := l.firstIndex() - 1
if i < dummyIndex || i > l.stableLastIndex() {
// TODO: return an error instead?
return 0, nil
}

t, err := l.storage.Term(i)
if err == nil {
return t, nil
}
if err == ErrCompacted || err == ErrUnavailable {
return 0, err
}
panic(err) // TODO(bdarnell)
}

func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) {
if i > l.lastIndex() {
return nil, nil
Expand Down Expand Up @@ -394,6 +435,7 @@ func (l *raftLog) maybeCompact(i uint64) {
func (l *raftLog) restore(s pb.Snapshot) {
l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
l.committed = s.Metadata.Index
l.localCommitted = s.Metadata.Index
// NOTE: applied and compacted will be reset in raft.advance().
l.unstable.restore(s)
l.pending = l.unstable.offset
Expand Down Expand Up @@ -445,15 +487,17 @@ func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
if lo > hi {
l.logger.Panicf("invalid slice %d > %d", lo, hi)
}

fi := l.firstIndex()
if lo < fi {
return ErrCompacted
}

length := l.lastIndex() + 1 - fi
if hi > fi+length {
l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
li := l.lastIndex()
if hi > li+1 {
l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, li)
}

return nil
}

Expand Down
33 changes: 16 additions & 17 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ type raft struct {
// the log
raftLog *raftLog

commit uint64

maxMsgSize uint64
maxUncommittedSize uint64
// TODO(tbg): rename to trk.
Expand Down Expand Up @@ -372,7 +370,8 @@ func newRaft(c *Config) *raft {
}

r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied,
r.raftLog.lastIndex(), r.raftLog.lastTerm())
return r
}

Expand All @@ -384,7 +383,7 @@ func (r *raft) hardState() pb.HardState {
return pb.HardState{
Term: r.Term,
Vote: r.Vote,
Commit: r.raftLog.committed,
Commit: r.raftLog.localCommitted,
}
}

Expand Down Expand Up @@ -470,7 +469,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
}
m.Snapshot = snapshot
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.logger.Debugf("%x [firstIndex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
Expand Down Expand Up @@ -840,7 +839,10 @@ func (r *raft) campaign(t CampaignType) {
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
r.send(pb.Message{
Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(),
LogTerm: r.raftLog.lastTerm(), Context: ctx,
})
}
}

Expand All @@ -867,7 +869,8 @@ func (r *raft) Step(m pb.Message) error {
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type,
m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
return nil
}
}
Expand Down Expand Up @@ -1414,7 +1417,7 @@ func stepCandidate(r *raft, m pb.Message) error {
r.handleAppendEntries(m)
case pb.MsgLogResp:
if r.raftLog.stableTo(m.Index, m.LogTerm) {
r.raftLog.commitTo(min(r.commit, m.Index))
r.raftLog.localCommitTo(r.raftLog.committed)
}
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
Expand Down Expand Up @@ -1462,7 +1465,7 @@ func stepFollower(r *raft, m pb.Message) error {
r.handleAppendEntries(m)
case pb.MsgLogResp:
if r.raftLog.stableTo(m.Index, m.LogTerm) {
r.raftLog.commitTo(min(r.commit, m.Index))
r.raftLog.localCommitTo(r.raftLog.committed)
r.send(pb.Message{To: r.lead, Type: pb.MsgAppResp, Index: m.Index})
}
case pb.MsgHeartbeat:
Expand Down Expand Up @@ -1509,10 +1512,6 @@ func (r *raft) handleAppendEntries(m pb.Message) {
return
}

if m.Commit > r.commit {
r.commit = m.Commit
}

if ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); !ok {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
Expand Down Expand Up @@ -1630,7 +1629,6 @@ func (r *raft) restore(s pb.Snapshot) bool {
Tracker: r.prs,
LastIndex: r.raftLog.lastIndex(),
}, cs)

if err != nil {
// This should never happen. Either there's a bug in our config change
// handling or the client corrupted the conf change.
Expand Down Expand Up @@ -1667,7 +1665,6 @@ func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
}
return changer.Simple(cc.Changes...)
}()

if err != nil {
// TODO(tbg): return the error to the caller.
panic(err)
Expand Down Expand Up @@ -1737,9 +1734,11 @@ func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.Co
}

func (r *raft) loadState(state pb.HardState) {
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
if state.Commit < r.raftLog.localCommitted || state.Commit > r.raftLog.lastIndex() {
r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit,
r.raftLog.localCommitted, r.raftLog.lastIndex())
}
r.raftLog.localCommitted = state.Commit
r.raftLog.committed = state.Commit
r.Term = state.Term
r.Vote = state.Vote
Expand Down

0 comments on commit 3abf12a

Please sign in to comment.