Skip to content

Commit

Permalink
fix raft bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Legendout committed Jul 31, 2024
1 parent be9b815 commit e44591a
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 144 deletions.
198 changes: 91 additions & 107 deletions kv/raftstore/peer_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,81 +137,22 @@ func (d *peerMsgHandler) processAdminRequest(entry *pb.Entry, requests *raft_cmd
d.ScheduleCompactLog(adminReq.CompactLog.CompactIndex)
log.Infof("%d apply commit, entry %v, type %s, truncatedIndex %v", d.peer.PeerId(), entry.Index, adminReq.CmdType, adminReq.CompactLog.CompactIndex)
}
case raft_cmdpb.AdminCmdType_Split: // Region Split 请求处理
//TODO
// error: regionId 不匹配
if requests.Header.RegionId != d.regionId {
regionNotFound := &util.ErrRegionNotFound{RegionId: requests.Header.RegionId}
d.handleProposal(entry, ErrResp(regionNotFound))
return kvWB
}
// error: 过期的请求
if errEpochNotMatch, ok := util.CheckRegionEpoch(requests, d.Region(), true).(*util.ErrEpochNotMatch); ok {
d.handleProposal(entry, ErrResp(errEpochNotMatch))
return kvWB
}
// error: key 不在 oldRegion 中
if err := util.CheckKeyInRegion(adminReq.Split.SplitKey, d.Region()); err != nil {
d.handleProposal(entry, ErrResp(err))
return kvWB
}
// error: Split Region 的 peers 和当前 oldRegion 的 peers 数量不相等,不知道为什么会出现这种原因
if len(d.Region().Peers) != len(adminReq.Split.NewPeerIds) {
d.handleProposal(entry, ErrRespStaleCommand(d.Term()))
return kvWB
}
oldRegion, split := d.Region(), adminReq.Split
oldRegion.RegionEpoch.Version++
newRegion := d.createNewSplitRegion(split, oldRegion) // 创建新的 Region
// 修改 storeMeta 信息
storeMeta := d.ctx.storeMeta
storeMeta.Lock()
storeMeta.regionRanges.Delete(&regionItem{region: oldRegion}) // 删除 oldRegion 的数据范围
oldRegion.EndKey = split.SplitKey // 修改 oldRegion 的 range
storeMeta.regionRanges.ReplaceOrInsert(&regionItem{region: oldRegion}) // 更新 oldRegion 的 range
storeMeta.regionRanges.ReplaceOrInsert(&regionItem{region: newRegion}) // 创建 newRegion 的 range
storeMeta.regions[newRegion.Id] = newRegion // 设置 regions 映射
storeMeta.Unlock()
// 持久化 oldRegion 和 newRegion
meta.WriteRegionState(kvWB, oldRegion, rspb.PeerState_Normal)
meta.WriteRegionState(kvWB, newRegion, rspb.PeerState_Normal)
// 这几句有用吗?
d.SizeDiffHint = 0
d.ApproximateSize = new(uint64)
// 创建当前 store 上的 newRegion Peer,注册到 router,并启动
peer, err := createPeer(d.storeID(), d.ctx.cfg, d.ctx.schedulerTaskSender, d.ctx.engine, newRegion)
if err != nil {
log.Panic(err)
}
d.ctx.router.register(peer)
d.ctx.router.send(newRegion.Id, message.Msg{Type: message.MsgTypeStart})
// 处理回调函数
d.handleProposal(entry, &raft_cmdpb.RaftCmdResponse{
Header: &raft_cmdpb.RaftResponseHeader{},
AdminResponse: &raft_cmdpb.AdminResponse{
CmdType: raft_cmdpb.AdminCmdType_Split,
Split: &raft_cmdpb.SplitResponse{Regions: []*metapb.Region{newRegion, oldRegion}},
},
})
log.Infof("[AdminCmdType_Split Process] oldRegin %v, newRegion %v", oldRegion, newRegion)
// 发送 heartbeat 给其他节点
if d.IsLeader() {
d.HeartbeatScheduler(d.ctx.schedulerTaskSender)
d.notifyHeartbeatScheduler(newRegion, peer)
}
}
return kvWB
}

// notifyHeartbeatScheduler 帮助 region 快速创建 peer
func (d *peerMsgHandler) notifyHeartbeatScheduler(region *metapb.Region, peer *peer) {
// 通知 heartbeat scheduler
d.ctx.schedulerTaskSender <- &runner.SchedulerTask{
RegionID: region.Id,
Kind: runner.SchedulerHeartbeat,
Args: &runner.SchedulerHeartbeatTask{
Peer: peer.peer,
},
clonedRegion := new(metapb.Region)
err := util.CloneMsg(region, clonedRegion)
if err != nil {
return
}
d.ctx.schedulerTaskSender <- &runner.SchedulerRegionHeartbeatTask{
Region: clonedRegion,
Peer: peer.Meta,
PendingPeers: peer.CollectPendingPeers(),
ApproximateSize: peer.ApproximateSize,
}
}

Expand All @@ -232,47 +173,83 @@ func (d *peerMsgHandler) createNewSplitRegion(split *raft_cmdpb.SplitRequest, ol

// processConfChange 处理配置变更日志
func (d *peerMsgHandler) processConfChange(entry *pb.Entry, cc *pb.ConfChange, kvWB *engine_util.WriteBatch) *engine_util.WriteBatch {
//1. 解析配置变更日志
var resp *raft_cmdpb.RaftCmdResponse
var err error
var ccData []byte
if cc.NodeId == 0 {
// NodeId 为 0 说明是一个 ConfChangeV2 类型的配置变更日志
ccV2 := &pb.ConfChangeV2{}
if err := ccV2.Unmarshal(cc.Context); err != nil {
log.Panic(err)
}
ccData = ccV2.ChangeType.String()
} else {
ccData = cc.ChangeType.String()
// 获取 ConfChange Command Request
msg := &raft_cmdpb.RaftCmdRequest{}
if err := msg.Unmarshal(cc.Context); err != nil {
log.Panic(err)
}
region := d.Region()
changePeerReq := msg.AdminRequest.ChangePeer
// 检查 Command Request 中的 RegionEpoch 是否是过期的,以此判定是不是一个重复的请求
// 实验指导书中提到,测试程序可能会多次提交同一个 ConfChange 直到 ConfChange 被应用
// CheckRegionEpoch 检查 RaftCmdRequest 头部携带的 RegionEpoch 是不是和 currentRegionEpoch 匹配
if err, ok := util.CheckRegionEpoch(msg, region, true).(*util.ErrEpochNotMatch); ok {
log.Infof("[processConfChange] %v RegionEpoch not match", d.PeerId())
d.handleProposal(entry, ErrResp(err))
return kvWB
}
//2. 处理配置变更日志
switch cc.ChangeType {
case pb.ConfChangeType_AddNode:
// 添加节点
resp, err = d.processAddNode(cc, ccData)
case pb.ConfChangeType_RemoveNode:
// 删除节点
resp, err = d.processRemoveNode(cc, ccData)
default:
log.Panic("unexpected conf type")
case pb.ConfChangeType_AddNode: // 添加一个节点
log.Infof("[AddNode] %v add %v", d.PeerId(), cc.NodeId)
// 待添加的节点必须原先在 Region 中不存在
if d.searchPeerWithId(cc.NodeId) == len(region.Peers) {
// region 中追加新的 peer
region.Peers = append(region.Peers, changePeerReq.Peer)
region.RegionEpoch.ConfVer++
meta.WriteRegionState(kvWB, region, rspb.PeerState_Normal) // PeerState 用来表示当前 Peer 是否在 region 中
// 更新 metaStore 中的 region 信息
d.updateStoreMeta(region)
// 更新 peerCache,peerCache 保存了 peerId -> Peer 的映射
// 当前 raft_store 上的 peer 需要发送消息给同一个 region 中的别的节点的时候,需要获取别的节点所在 storeId
// peerCache 里面就保存了属于同一个 region 的所有 peer 的元信息(peerId, storeId)
d.insertPeerCache(changePeerReq.Peer)
}
case pb.ConfChangeType_RemoveNode: // 删除一个节点
log.Infof("[RemoveNode] %v remove %v", d.PeerId(), cc.NodeId)
// 如果目标节点是自身,那么直接销毁并返回:从 raft_store 上删除所属 region 的所有信息
if cc.NodeId == d.PeerId() {
d.destroyPeer()
log.Infof("[RemoveNode] destory %v compeleted", cc.NodeId)
return kvWB
}
// 待删除的节点必须存在于 region 中
n := d.searchPeerWithId(cc.NodeId)
if n != len(region.Peers) {
// 删除节点 RaftGroup 中的第 n 个 peer(注意,这里并不是编号为 n 的 peer,而是第 n 个 peer)
region.Peers = append(region.Peers[:n], region.Peers[n+1:]...)
region.RegionEpoch.ConfVer++
meta.WriteRegionState(kvWB, region, rspb.PeerState_Normal) // PeerState 用来表示当前 Peer 是否在 region 中
// 更新 metaStore 中的 region 信息
d.updateStoreMeta(region)
// 更新 peerCache
d.removePeerCache(cc.NodeId)
}
}
if err != nil {
resp = ErrResp(err)
// 更新 raft 层的配置信息
d.RaftGroup.ApplyConfChange(*cc)
// 处理 proposal
d.handleProposal(entry, &raft_cmdpb.RaftCmdResponse{
Header: &raft_cmdpb.RaftResponseHeader{},
AdminResponse: &raft_cmdpb.AdminResponse{
CmdType: raft_cmdpb.AdminCmdType_ChangePeer,
ChangePeer: &raft_cmdpb.ChangePeerResponse{Region: region},
},
})
// 新增加的 peer 是通过 leader 的心跳完成的
if d.IsLeader() {
d.HeartbeatScheduler(d.ctx.schedulerTaskSender)
}
d.handleProposal(entry, resp)
return kvWB
}

// searchPeerWithId 根据需要添加或者删除的 Peer id,找到 region 中是否已经存在这个 Peer
func (d *peerMsgHandler) searchPeerWithId(nodeId uint64) int {
region := d.Region()
for i, peer := range region.Peers {
for id, peer := range d.peerStorage.region.Peers {
if peer.Id == nodeId {
return i
return id
}
}
return len(region.Peers)
return len(d.peerStorage.region.Peers)
}
func (d *peerMsgHandler) updateStoreMeta(region *metapb.Region) {
storeMeta := d.ctx.storeMeta
Expand Down Expand Up @@ -351,15 +328,25 @@ func (d *peerMsgHandler) handleProposal(entry *pb.Entry, resp *raft_cmdpb.RaftCm
// 其他情况:正确匹配的 proposal(处理完毕之后应该立即结束),further proposal(直接返回)
for len(d.proposals) > 0 {
proposal := d.proposals[0]
if proposal.index < entry.Index {
// 过期的 proposal
d.handleProposalStale(entry, proposal)
} else if proposal.index == entry.Index {
// 正确匹配的 proposal
// proposal.index < entry.index 是有可能出现的
// 如果 leader 宕机了并且有一个新的 leader 向它发送了快照,当应用了快照之后又继续同步了新的日志并 commit 了
// 这个时候 proposal.index < entry.index
if proposal.term < entry.Term || proposal.index < entry.Index {
// 日志被截断的情况
NotifyStaleReq(proposal.term, proposal.cb)
d.proposals = d.proposals[1:]
continue
}
// 正常匹配
if proposal.term == entry.Term && proposal.index == entry.Index {
if proposal.cb != nil {
proposal.cb.Txn = d.peerStorage.Engines.Kv.NewTransaction(false) // snap resp should set txn explicitly
}
proposal.cb.Done(resp)
d.proposals = d.proposals[1:]
return
}
// further proposal(即当前的 entry 并没有 proposal 在等待,或许是因为现在是 follower 在处理 committed entry)
return
}
}

Expand Down Expand Up @@ -450,9 +437,6 @@ func (d *peerMsgHandler) proposeAdminRequest(msg *raft_cmdpb.RaftCmdRequest, cb
if err := d.RaftGroup.Propose(data); err != nil {
log.Panic(err)
}
case raft_cmdpb.AdminCmdType_TransferLeader:
case raft_cmdpb.AdminCmdType_ChangePeer:
case raft_cmdpb.AdminCmdType_Split:
}
}

Expand Down
19 changes: 8 additions & 11 deletions kv/raftstore/peer_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,14 @@ func (ps *PeerStorage) clearMeta(kvWB, raftWB *engine_util.WriteBatch) error {

// Delete all data that is not covered by `new_region`.
func (ps *PeerStorage) clearExtraData(newRegion *metapb.Region) {
// Your Code Here (2C).
// 1. 获取当前 region 的 startKey 和 endKey
startKey, endKey := ps.region.StartKey, ps.region.EndKey
// 2. 获取新 region 的 startKey 和 endKey
newStartKey, newEndKey := newRegion.StartKey, newRegion.EndKey
// 3. 判断新 region 是否和当前 region 一样,如果一样则不需要删除
if bytes.Equal(startKey, newStartKey) && bytes.Equal(endKey, newEndKey) {
return
}
// 4. 删除当前 region 的数据
ps.clearRange(ps.region.Id, startKey, endKey)
oldStartKey, oldEndKey := ps.region.GetStartKey(), ps.region.GetEndKey()
newStartKey, newEndKey := newRegion.GetStartKey(), newRegion.GetEndKey()
if bytes.Compare(oldStartKey, newStartKey) < 0 {
ps.clearRange(newRegion.Id, oldStartKey, newStartKey)
}
if bytes.Compare(newEndKey, oldEndKey) < 0 || (len(oldEndKey) == 0 && len(newEndKey) != 0) {
ps.clearRange(newRegion.Id, newEndKey, oldEndKey)
}
}

// ClearMeta delete stale metadata like raftState, applyState, regionState and raft log entries
Expand Down
60 changes: 34 additions & 26 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,6 @@ func (r *Raft) leaderTick() {
// MessageType_MsgBeat 属于内部消息,不需要经过 RawNode 处理
r.Step(pb.Message{From: r.id, To: r.id, MsgType: pb.MessageType_MsgBeat})
}
//TODO 选举超时 判断心跳回应数量

//TODO 3A 禅让机制
if r.leadTransferee != None {
// 在选举超时后领导权禅让仍然未完成,则 leader 应该终止领导权禅让,这样可以恢复客户端请求
r.transferElapsed++
}
}

func (r *Raft) candidateTick() {
Expand Down Expand Up @@ -400,12 +393,6 @@ func (r *Raft) becomeLeader() {
r.Prs[id].Next = r.RaftLog.LastIndex() + 1 // 初始化为 leader 的最后一条日志索引(后续出现冲突会往前移动)
r.Prs[id].Match = 0 // 初始化为 0 就可以了
}
//3A
r.PendingConfIndex = r.initPendingConfIndex()
// 成为 Leader 之后立马在日志中追加一条 noop 日志,这是因为
// 在 Raft 论文中提交 Leader 永远不会通过计算副本的方式提交一个之前任期、并且已经被复制到大多数节点的日志
// 通过追加一条当前任期的 noop 日志,可以快速的提交之前任期内所有被复制到大多数节点的日志
r.Step(pb.Message{MsgType: pb.MessageType_MsgPropose, Entries: []*pb.Entry{{}}})
}

// Step the entrance of handle message, see `MessageType`
Expand Down Expand Up @@ -632,7 +619,21 @@ func (r *Raft) handleRequestVote(m pb.Message) {
}
//1. 判断 Msg 的 Term 是否大于等于自己的 Term,是则更新
if m.Term > r.Term {
r.becomeFollower(m.Term, m.From)
// term比自己大 变成follower
r.becomeFollower(m.Term, None)
}
if (m.Term > r.Term || m.Term == r.Term && (r.Vote == None || r.Vote == m.From)) && r.RaftLog.isUpToDate(m.Index, m.LogTerm) {
// 投票
// 1. Candidate 任期大于自己并且日志足够新
// 2. Candidate 任期和自己相等并且自己在当前任期内没有投过票或者已经投给了 Candidate,并且 Candidate 的日志足够新
r.becomeFollower(m.Term, None)
r.Vote = m.From
} else {
// 拒绝投票
// 1. Candidate 的任期小于自己
// 2. 自己在当前任期已经投过票了
// 3. Candidate 的日志不够新
voteRep.Reject = true
}
r.msgs = append(r.msgs, voteRep)
}
Expand Down Expand Up @@ -674,8 +675,25 @@ func (r *Raft) handleAppendEntries(m pb.Message) {
prevLogTerm := m.LogTerm
r.becomeFollower(m.Term, m.From)
if prevLogIndex > r.RaftLog.LastIndex() || r.RaftLog.TermNoErr(prevLogIndex) != prevLogTerm {
r.msgs = append(r.msgs, appendEntryResp)
return
//最后一条index和preLogIndex冲突、或者任期冲突
//这时不能直接将 leader 传递过来的 Entries 覆盖到 follower 日志上
//这里可以直接返回,以便让 leader 尝试 prevLogIndex - 1 这条日志
//但是这样 follower 和 leader 之间的同步比较慢
//TODO 日志冲突优化:
// 找到冲突任期的第一条日志,下次 leader 发送 AppendEntry 的时候会将 nextIndex 设置为 ConflictIndex
// 如果找不到的话就设置为 prevLogIndex 的前一个
appendEntryResp.Index = r.RaftLog.LastIndex()
//appendEntryResp.Index = prevLogIndex - 1 // 用于提示 leader prevLogIndex 的开始位置是appendEntryResp.Index
if prevLogIndex <= r.RaftLog.LastIndex() {
conflictTerm := r.RaftLog.TermNoErr(prevLogIndex)
for _, ent := range r.RaftLog.entries {
if ent.Term == conflictTerm {
//找到冲突任期的上一个任期的idx位置
appendEntryResp.Index = ent.Index - 1
break
}
}
}
} else {
//prevLogIndex没有冲突
if len(m.Entries) > 0 {
Expand Down Expand Up @@ -732,12 +750,6 @@ func (r *Raft) handleAppendEntriesResponse(m pb.Message) {
r.broadcastAppendEntry()
}
}
//3A
if r.leadTransferee == m.From && r.Prs[m.From].Match == r.RaftLog.LastIndex() {
// AppendEntryResponse 回复来自 leadTransferee,检查日志是否是最新的
// 如果 leadTransferee 达到了最新的日志则立即发起领导权禅让
r.sendTimeoutNow(m.From)
}
}

// maybeUpdate 检查日志同步是不是一个过期的回复
Expand Down Expand Up @@ -897,10 +909,6 @@ func (r *Raft) handleHeartbeatResponse(m pb.Message) {
// handlePropose 追加从上层应用接收到的新日志,并广播给 follower
func (r *Raft) handlePropose(m pb.Message) {
r.appendEntry(m.Entries)
// leader 处于领导权禅让,停止接收新的请求
if r.leadTransferee != None {
return
}
r.Prs[r.id].Match = r.RaftLog.LastIndex()
r.Prs[r.id].Next = r.RaftLog.LastIndex() + 1
if len(r.Prs) == 1 {
Expand Down

0 comments on commit e44591a

Please sign in to comment.