diff --git a/raft/bootstrap.go b/raft/bootstrap.go index 62cd4dc38..a6ac648a5 100644 --- a/raft/bootstrap.go +++ b/raft/bootstrap.go @@ -70,8 +70,9 @@ func (rn *RawNode) Bootstrap(peers []Peer) error { // We do not set raftLog.applied so the application will be able // to observe all conf changes via Ready.CommittedEntries. // - // TODO(bdarnell): These entries are still unstable; do we need to preserve - // the invariant that committed < unstable? + // NOTE: These entries are still unstable. + // + // TODO(james.yin): Set localCommitted? rn.raft.raftLog.committed = uint64(len(ents)) for _, peer := range peers { rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2()) diff --git a/raft/log.go b/raft/log.go index fecb5d75c..5992599cb 100644 --- a/raft/log.go +++ b/raft/log.go @@ -32,10 +32,13 @@ type raftLog struct { pending uint64 // committed is the highest log position that is known to be in // stable storage on a quorum of nodes. + // Invariant: committed < unstable.offset + len(unstable.entries) committed uint64 + // Invariant: localCommitted = min(committed, unstable.offset) + localCommitted uint64 // applied is the highest log position that the application has // been instructed to apply to its state machine. - // Invariant: applied <= committed + // Invariant: applied <= localCommitted applied uint64 // compacted is the highest log position that the application can // delete safety. @@ -80,6 +83,7 @@ func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raf log.pending = lastIndex + 1 // Initialize our committed and applied pointers to the time of the last compaction. log.committed = firstIndex - 1 + log.localCommitted = firstIndex - 1 log.applied = firstIndex - 1 log.compacted = firstIndex - 1 @@ -97,6 +101,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry return false } + li := index + uint64(len(ents)) ci := l.findConflict(ents) switch { case ci == 0: @@ -109,11 +114,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry } l.append(ents[ci-offset:]...) } - - // FIXME: use stable - // li := l.lastIndex() - // l.commitTo(min(committed, li)) - + l.commitTo(min(committed, li)) return true } @@ -204,9 +205,9 @@ func (l *raftLog) pendingEntries() []pb.Entry { // If applied is smaller than the index of snapshot, it returns all committed // entries after the index of snapshot. func (l *raftLog) nextEnts() (ents []pb.Entry) { - off := max(l.applied+1, l.firstIndex()) - if l.committed+1 > off { - ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize) + lo := max(l.applied+1, l.firstIndex()) + if hi := l.localCommitted + 1; hi > lo { + ents, err := l.slice(lo, hi, l.maxNextEntsSize) if err != nil { l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) } @@ -219,7 +220,7 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) { // is a fast check without heavy raftLog.slice() in raftLog.nextEnts(). func (l *raftLog) hasNextEnts() bool { off := max(l.applied+1, l.firstIndex()) - return l.committed+1 > off + return l.localCommitted+1 > off } // hasPendingSnapshot returns if there is pending snapshot waiting for applying. @@ -256,14 +257,12 @@ func (l *raftLog) lastIndex() uint64 { return i } -func (l *raftLog) commitTo(tocommit uint64) { - // never decrease commit - if l.committed < tocommit { - if l.lastIndex() < tocommit { - l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex()) - } - l.committed = tocommit +func (l *raftLog) stableLastIndex() uint64 { + i, err := l.storage.LastIndex() + if err != nil { + panic(err) // TODO(james.yin) } + return i } func (l *raftLog) compactTo(tocompact uint64) { @@ -280,12 +279,36 @@ func (l *raftLog) appliedTo(i uint64) { if i == 0 { return } - if l.committed < i || i < l.applied { - l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) + if l.localCommitted < i || i < l.applied { + l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), localCommitted(%d)]", i, l.applied, l.localCommitted) } l.applied = i } +func (l *raftLog) localCommitTo(tocommit uint64) { + if tocommit >= l.unstable.offset { + tocommit = l.unstable.offset + } + // never decrease commit + if l.localCommitted < tocommit { + // if li := l.stableLastIndex(); li < tocommit { + // l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, li) + // } + l.localCommitted = tocommit + } +} + +func (l *raftLog) commitTo(tocommit uint64) { + // never decrease commit + if l.committed < tocommit { + // if li := l.lastIndex(); li < tocommit { + // l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, li) + // } + l.committed = tocommit + } + l.localCommitTo(tocommit) +} + func (l *raftLog) persistingTo(i, t uint64) { gt, ok := l.unstable.maybeTerm(i) if !ok { @@ -338,6 +361,24 @@ func (l *raftLog) term(i uint64) (uint64, error) { panic(err) // TODO(bdarnell) } +func (l *raftLog) stableTerm(i uint64) (uint64, error) { + // the valid term range is [index of dummy entry, last index] + dummyIndex := l.firstIndex() - 1 + if i < dummyIndex || i > l.stableLastIndex() { + // TODO: return an error instead? + return 0, nil + } + + t, err := l.storage.Term(i) + if err == nil { + return t, nil + } + if err == ErrCompacted || err == ErrUnavailable { + return 0, err + } + panic(err) // TODO(bdarnell) +} + func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) { if i > l.lastIndex() { return nil, nil @@ -394,6 +435,7 @@ func (l *raftLog) maybeCompact(i uint64) { func (l *raftLog) restore(s pb.Snapshot) { l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term) l.committed = s.Metadata.Index + l.localCommitted = s.Metadata.Index // NOTE: applied and compacted will be reset in raft.advance(). l.unstable.restore(s) l.pending = l.unstable.offset @@ -445,15 +487,17 @@ func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { if lo > hi { l.logger.Panicf("invalid slice %d > %d", lo, hi) } + fi := l.firstIndex() if lo < fi { return ErrCompacted } - length := l.lastIndex() + 1 - fi - if hi > fi+length { - l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex()) + li := l.lastIndex() + if hi > li+1 { + l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, li) } + return nil } diff --git a/raft/raft.go b/raft/raft.go index 17b376fa8..980d52278 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -253,8 +253,6 @@ type raft struct { // the log raftLog *raftLog - commit uint64 - maxMsgSize uint64 maxUncommittedSize uint64 // TODO(tbg): rename to trk. @@ -372,7 +370,8 @@ func newRaft(c *Config) *raft { } r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]", - r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm()) + r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, + r.raftLog.lastIndex(), r.raftLog.lastTerm()) return r } @@ -384,7 +383,7 @@ func (r *raft) hardState() pb.HardState { return pb.HardState{ Term: r.Term, Vote: r.Vote, - Commit: r.raftLog.committed, + Commit: r.raftLog.localCommitted, } } @@ -470,7 +469,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { } m.Snapshot = snapshot sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term - r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", + r.logger.Debugf("%x [firstIndex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) pr.BecomeSnapshot(sindex) r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) @@ -840,7 +839,10 @@ func (r *raft) campaign(t CampaignType) { if t == campaignTransfer { ctx = []byte(t) } - r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) + r.send(pb.Message{ + Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), + LogTerm: r.raftLog.lastTerm(), Context: ctx, + }) } } @@ -867,7 +869,8 @@ func (r *raft) Step(m pb.Message) error { // If a server receives a RequestVote request within the minimum election timeout // of hearing from a current leader, it does not update its term or grant its vote r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)", - r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed) + r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, + m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed) return nil } } @@ -1414,7 +1417,7 @@ func stepCandidate(r *raft, m pb.Message) error { r.handleAppendEntries(m) case pb.MsgLogResp: if r.raftLog.stableTo(m.Index, m.LogTerm) { - r.raftLog.commitTo(min(r.commit, m.Index)) + r.raftLog.localCommitTo(r.raftLog.committed) } case pb.MsgHeartbeat: r.becomeFollower(m.Term, m.From) // always m.Term == r.Term @@ -1462,7 +1465,7 @@ func stepFollower(r *raft, m pb.Message) error { r.handleAppendEntries(m) case pb.MsgLogResp: if r.raftLog.stableTo(m.Index, m.LogTerm) { - r.raftLog.commitTo(min(r.commit, m.Index)) + r.raftLog.localCommitTo(r.raftLog.committed) r.send(pb.Message{To: r.lead, Type: pb.MsgAppResp, Index: m.Index}) } case pb.MsgHeartbeat: @@ -1509,10 +1512,6 @@ func (r *raft) handleAppendEntries(m pb.Message) { return } - if m.Commit > r.commit { - r.commit = m.Commit - } - if ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); !ok { r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) @@ -1630,7 +1629,6 @@ func (r *raft) restore(s pb.Snapshot) bool { Tracker: r.prs, LastIndex: r.raftLog.lastIndex(), }, cs) - if err != nil { // This should never happen. Either there's a bug in our config change // handling or the client corrupted the conf change. @@ -1667,7 +1665,6 @@ func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState { } return changer.Simple(cc.Changes...) }() - if err != nil { // TODO(tbg): return the error to the caller. panic(err) @@ -1737,9 +1734,11 @@ func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.Co } func (r *raft) loadState(state pb.HardState) { - if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() { - r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex()) + if state.Commit < r.raftLog.localCommitted || state.Commit > r.raftLog.lastIndex() { + r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, + r.raftLog.localCommitted, r.raftLog.lastIndex()) } + r.raftLog.localCommitted = state.Commit r.raftLog.committed = state.Commit r.Term = state.Term r.Vote = state.Vote