Skip to content

Commit

Permalink
update raft snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
Legendout committed Jul 28, 2024
1 parent 0fcb69c commit 1af3179
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 5 deletions.
8 changes: 8 additions & 0 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ func newLog(storage Storage) *RaftLog {
// 存储压缩稳定日志条目阻止日志条目在内存中无限增长
func (l *RaftLog) maybeCompact() {
// Your Code Here (2C).
newFirst, _ := l.storage.FirstIndex()
if newFirst > l.dummyIndex {
//为了GC原来的 所以append
entries := l.entries[newFirst-l.dummyIndex:]
l.entries = make([]pb.Entry, 0)
l.entries = append(l.entries, entries...)
}
l.dummyIndex = newFirst
}

// allEntries return all the entries not compacted.
Expand Down
93 changes: 88 additions & 5 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ package raft

import (
"errors"
"log"
rand2 "math/rand"
"sort"
"time"

"github.com/pingcap-incubator/tinykv/log"

pb "github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb"
)

Expand Down Expand Up @@ -108,28 +109,36 @@ func (c *Config) validate() error {
// Progress represents a follower’s progress in the view of the leader. Leader maintains
// progresses of all followers, and sends entries to the follower based on its progress.
type Progress struct {
//nextIndex 对于每个节点,待发送到该节点的下一个日志条目的索引,初值为领导人最后的日志条目索引 + 1
//matchIndex 对于每个节点,已知的已经同步到该节点的最高日志条目的索引,初值为0,表示没有
Match, Next uint64
}

type Raft struct {
//节点 ID
id uint64

//节点当前的 Term
Term uint64
//当前 Term 内,节点把票投给了谁
Vote uint64

// the log
RaftLog *RaftLog

// log replication progress of each peers
// 日志复制需要记录follower的进度
Prs map[uint64]*Progress

// this peer's role
// 角色
State StateType

// votes records
// votes records(存放哪些节点投票给了本节点)
votes map[uint64]bool

// msgs need to send
// 需要发送的数据Msg
msgs []pb.Message

// the leader id
Expand All @@ -151,9 +160,8 @@ type Raft struct {
// Follow the procedure defined in section 3.10 of Raft phd thesis.
// (https://web.stanford.edu/~ouster/cgi-bin/papers/OngaroPhD.pdf)
// (Used in 3A leader transfer)
leadTransferee uint64

transferElapsed int
leadTransferee uint64
transferElapsed int // 用于计时 transfer 的时间

// Only one conf change may be pending (in the log, but not yet
// applied) at a time. This is enforced via PendingConfIndex, which
Expand All @@ -162,8 +170,11 @@ type Raft struct {
// be proposed if the leader's applied index is greater than this
// value.
// (Used in 3A conf change)
// PendingConfIndex 表示当前还没有生效的 ConfChange,只有在日志被提交并应用之后才会生效
// 一次只能挂起一个conf更改(在日志中,但尚未应用)。这是通过PendingConfIndex实现的,该值设置为>=最新挂起配置更改(如果有)的日志索引。仅当领导者的应用索引大于此值时,才允许提议配置更改。
PendingConfIndex uint64

// add 随机超时选举,[electionTimeout, 2*electionTimeout)[150ms,300ms]
randomElectionTimeout int
}

Expand Down Expand Up @@ -196,9 +207,29 @@ func newRaft(c *Config) *Raft {
}
//生成随机选举超时时间
rf.resetRandomizedElectionTimeout()

// 更新集群配置
rf.Prs = make(map[uint64]*Progress)
for _, id := range c.peers {
rf.Prs[id] = &Progress{}
}
//3A
rf.PendingConfIndex = rf.initPendingConfIndex()

return rf
}

// initPendingConfIndex 初始化 pendingConfIndex
// 查找 [appliedIndex + 1, lastIndex] 之间是否存在还没有 Apply 的 ConfChange Entry
func (r *Raft) initPendingConfIndex() uint64 {
for i := r.RaftLog.applied + 1; i <= r.RaftLog.LastIndex(); i++ {
if r.RaftLog.entries[i-r.RaftLog.dummyIndex].EntryType == pb.EntryType_EntryConfChange {
return i
}
}
return None
}

// resetRandomizedElectionTimeout 生成随机选举超时时间,范围在 [r.electionTimeout, 2*r.electionTimeout]
func (r *Raft) resetRandomizedElectionTimeout() {
rand2.Seed(time.Now().UnixNano())
Expand All @@ -209,6 +240,8 @@ func (r *Raft) resetRandomizedElectionTimeout() {

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer. Returns true if a message was sent.
// sendAppend发送一个带有新条目(如果有)的append RPC
// 给跟随者的当前提交索引。如果如果发送了消息,则返回true。
func (r *Raft) sendAppend(to uint64) bool {
// Your Code Here (2A).
// prevLogIndex 和 prevLogTerm 用来判断 leader 和 follower 的日志是否冲突
Expand Down Expand Up @@ -238,9 +271,33 @@ func (r *Raft) sendAppend(to uint64) bool {
r.msgs = append(r.msgs, appendMsg)
return true
}
// 有错误,说明 nextIndex 存在于快照中,此时需要发送快照给 followers
//2C
r.sendSnapshot(to)
log.Infof("[Snapshot Request]%d to %d, prevLogIndex %v, dummyIndex %v", r.id, to, prevLogIndex, r.RaftLog.dummyIndex)

return false
}

// sendSnapshot 发送快照给别的节点
func (r *Raft) sendSnapshot(to uint64) {
snapshot, err := r.RaftLog.storage.Snapshot()
if err != nil {
// 生成 Snapshot 的工作是由 region worker 异步执行的,如果 Snapshot 还没有准备好
// 此时会返回 ErrSnapshotTemporarilyUnavailable 错误,此时 leader 应该放弃本次 Snapshot Request
// 等待下一次再请求 storage 获取 snapshot(通常来说会在下一次 heartbeat response 的时候发送 snapshot)
return
}
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgSnapshot,
From: r.id,
To: to,
Term: r.Term,
Snapshot: &snapshot,
})
r.Prs[to].Next = snapshot.Metadata.Index + 1
}

// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *Raft) sendHeartbeat(to uint64) {
// Your Code Here (2A).
Expand All @@ -253,6 +310,8 @@ func (r *Raft) sendHeartbeat(to uint64) {
}

// tick advances the internal logical clock by a single tick.
// 推动时间流逝
// 每调用一次,就要增加节点的心跳计时
func (r *Raft) tick() {
// Your Code Here (2A).
switch r.State {
Expand All @@ -274,7 +333,15 @@ func (r *Raft) leaderTick() {
//TODO 选举超时 判断心跳回应数量

//TODO 3A 禅让机制
if r.leadTransferee != None {
// 在选举超时后领导权禅让仍然未完成,则 leader 应该终止领导权禅让,这样可以恢复客户端请求
r.transferElapsed++
if r.transferElapsed >= r.electionTimeout {
r.leadTransferee = None
}
}
}

func (r *Raft) candidateTick() {
r.electionElapsed++
// 选举超时 发起选举
Expand All @@ -284,6 +351,7 @@ func (r *Raft) candidateTick() {
r.Step(pb.Message{From: r.id, To: r.id, MsgType: pb.MessageType_MsgHup})
}
}

func (r *Raft) followerTick() {
r.electionElapsed++
// 选举超时 发起选举
Expand Down Expand Up @@ -335,10 +403,17 @@ func (r *Raft) becomeLeader() {
r.Prs[id].Next = r.RaftLog.LastIndex() + 1 // 初始化为 leader 的最后一条日志索引(后续出现冲突会往前移动)
r.Prs[id].Match = 0 // 初始化为 0 就可以了
}
//3A
r.PendingConfIndex = r.initPendingConfIndex()
// 成为 Leader 之后立马在日志中追加一条 noop 日志,这是因为
// 在 Raft 论文中提交 Leader 永远不会通过计算副本的方式提交一个之前任期、并且已经被复制到大多数节点的日志
// 通过追加一条当前任期的 noop 日志,可以快速的提交之前任期内所有被复制到大多数节点的日志
r.Step(pb.Message{MsgType: pb.MessageType_MsgPropose, Entries: []*pb.Entry{{}}})
}

// Step the entrance of handle message, see `MessageType`
// on `eraftpb.proto` for what msgs should be handled
// 该函数接收一个 Msg,然后根据节点的角色和 Msg 的类型调用不同的处理函数。
func (r *Raft) Step(m pb.Message) error {
// Your Code Here (2A).
switch r.State {
Expand All @@ -357,6 +432,7 @@ func (r *Raft) Step(m pb.Message) error {
}
return nil
}

func (r *Raft) followerStep(m pb.Message) {
//Follower 可以接收到的消息:
//MsgHup、MsgRequestVote、MsgHeartBeat、MsgAppendEntry
Expand Down Expand Up @@ -411,6 +487,7 @@ func (r *Raft) followerStep(m pb.Message) {
r.handleTimeoutNowRequest(m)
}
}

func (r *Raft) candidateStep(m pb.Message) {
//Candidate 可以接收到的消息:
//MsgHup、MsgRequestVote、MsgRequestVoteResponse、MsgHeartBeat
Expand Down Expand Up @@ -468,6 +545,7 @@ func (r *Raft) candidateStep(m pb.Message) {
r.handleTimeoutNowRequest(m)
}
}

func (r *Raft) leaderStep(m pb.Message) {
//Leader 可以接收到的消息:
//MsgBeat、MsgHeartBeatResponse、MsgRequestVote、MsgPropose、MsgAppendResponse、MsgAppend
Expand Down Expand Up @@ -667,6 +745,7 @@ func (r *Raft) handleAppendEntries(m pb.Message) {
//回发
r.msgs = append(r.msgs, appendEntryResp)
}

func (r *Raft) handleAppendEntriesResponse(m pb.Message) {
//被拒绝
if m.Reject {
Expand Down Expand Up @@ -720,6 +799,7 @@ func (r *Raft) maybeCommit() bool {
// 检查是否可以提交 toCommitIndex
return r.RaftLog.maybeCommit(toCommitIndex, r.Term)
}

func (r *Raft) broadcastAppendEntry() {
for id := range r.Prs {
if id == r.id {
Expand Down Expand Up @@ -863,6 +943,7 @@ func (r *Raft) handlePropose(m pb.Message) {
r.broadcastAppendEntry()
}
}

func (r *Raft) appendEntry(entries []*pb.Entry) {
lastIndex := r.RaftLog.LastIndex() // leader 最后一条日志的索引
for i := range entries {
Expand All @@ -875,6 +956,7 @@ func (r *Raft) appendEntry(entries []*pb.Entry) {
}
r.RaftLog.appendNewEntry(entries)
}

func (r *Raft) handleTransferLeader(m pb.Message) {
// 判断 transferee 是否在集群中
if _, ok := r.Prs[m.From]; !ok {
Expand Down Expand Up @@ -905,6 +987,7 @@ func (r *Raft) handleTransferLeader(m pb.Message) {
func (r *Raft) sendTimeoutNow(to uint64) {
r.msgs = append(r.msgs, pb.Message{MsgType: pb.MessageType_MsgTimeoutNow, From: r.id, To: to})
}

func (r *Raft) handleTimeoutNowRequest(m pb.Message) {
if _, ok := r.Prs[r.id]; !ok {
return
Expand Down

0 comments on commit 1af3179

Please sign in to comment.