From e44591a9444ac6e0e2ae12bdefb7d31a5eac6b20 Mon Sep 17 00:00:00 2001 From: Legendout <525187879@qq.com> Date: Wed, 31 Jul 2024 20:48:57 +0800 Subject: [PATCH] fix raft bug --- kv/raftstore/peer_msg_handler.go | 198 ++++++++++++++----------------- kv/raftstore/peer_storage.go | 19 ++- raft/raft.go | 60 ++++++---- 3 files changed, 133 insertions(+), 144 deletions(-) diff --git a/kv/raftstore/peer_msg_handler.go b/kv/raftstore/peer_msg_handler.go index af48cad..996e8ee 100644 --- a/kv/raftstore/peer_msg_handler.go +++ b/kv/raftstore/peer_msg_handler.go @@ -137,81 +137,22 @@ func (d *peerMsgHandler) processAdminRequest(entry *pb.Entry, requests *raft_cmd d.ScheduleCompactLog(adminReq.CompactLog.CompactIndex) log.Infof("%d apply commit, entry %v, type %s, truncatedIndex %v", d.peer.PeerId(), entry.Index, adminReq.CmdType, adminReq.CompactLog.CompactIndex) } - case raft_cmdpb.AdminCmdType_Split: // Region Split 请求处理 - //TODO - // error: regionId 不匹配 - if requests.Header.RegionId != d.regionId { - regionNotFound := &util.ErrRegionNotFound{RegionId: requests.Header.RegionId} - d.handleProposal(entry, ErrResp(regionNotFound)) - return kvWB - } - // error: 过期的请求 - if errEpochNotMatch, ok := util.CheckRegionEpoch(requests, d.Region(), true).(*util.ErrEpochNotMatch); ok { - d.handleProposal(entry, ErrResp(errEpochNotMatch)) - return kvWB - } - // error: key 不在 oldRegion 中 - if err := util.CheckKeyInRegion(adminReq.Split.SplitKey, d.Region()); err != nil { - d.handleProposal(entry, ErrResp(err)) - return kvWB - } - // error: Split Region 的 peers 和当前 oldRegion 的 peers 数量不相等,不知道为什么会出现这种原因 - if len(d.Region().Peers) != len(adminReq.Split.NewPeerIds) { - d.handleProposal(entry, ErrRespStaleCommand(d.Term())) - return kvWB - } - oldRegion, split := d.Region(), adminReq.Split - oldRegion.RegionEpoch.Version++ - newRegion := d.createNewSplitRegion(split, oldRegion) // 创建新的 Region - // 修改 storeMeta 信息 - storeMeta := d.ctx.storeMeta - storeMeta.Lock() - storeMeta.regionRanges.Delete(®ionItem{region: oldRegion}) // 删除 oldRegion 的数据范围 - oldRegion.EndKey = split.SplitKey // 修改 oldRegion 的 range - storeMeta.regionRanges.ReplaceOrInsert(®ionItem{region: oldRegion}) // 更新 oldRegion 的 range - storeMeta.regionRanges.ReplaceOrInsert(®ionItem{region: newRegion}) // 创建 newRegion 的 range - storeMeta.regions[newRegion.Id] = newRegion // 设置 regions 映射 - storeMeta.Unlock() - // 持久化 oldRegion 和 newRegion - meta.WriteRegionState(kvWB, oldRegion, rspb.PeerState_Normal) - meta.WriteRegionState(kvWB, newRegion, rspb.PeerState_Normal) - // 这几句有用吗? - d.SizeDiffHint = 0 - d.ApproximateSize = new(uint64) - // 创建当前 store 上的 newRegion Peer,注册到 router,并启动 - peer, err := createPeer(d.storeID(), d.ctx.cfg, d.ctx.schedulerTaskSender, d.ctx.engine, newRegion) - if err != nil { - log.Panic(err) - } - d.ctx.router.register(peer) - d.ctx.router.send(newRegion.Id, message.Msg{Type: message.MsgTypeStart}) - // 处理回调函数 - d.handleProposal(entry, &raft_cmdpb.RaftCmdResponse{ - Header: &raft_cmdpb.RaftResponseHeader{}, - AdminResponse: &raft_cmdpb.AdminResponse{ - CmdType: raft_cmdpb.AdminCmdType_Split, - Split: &raft_cmdpb.SplitResponse{Regions: []*metapb.Region{newRegion, oldRegion}}, - }, - }) - log.Infof("[AdminCmdType_Split Process] oldRegin %v, newRegion %v", oldRegion, newRegion) - // 发送 heartbeat 给其他节点 - if d.IsLeader() { - d.HeartbeatScheduler(d.ctx.schedulerTaskSender) - d.notifyHeartbeatScheduler(newRegion, peer) - } } return kvWB } // notifyHeartbeatScheduler 帮助 region 快速创建 peer func (d *peerMsgHandler) notifyHeartbeatScheduler(region *metapb.Region, peer *peer) { - // 通知 heartbeat scheduler - d.ctx.schedulerTaskSender <- &runner.SchedulerTask{ - RegionID: region.Id, - Kind: runner.SchedulerHeartbeat, - Args: &runner.SchedulerHeartbeatTask{ - Peer: peer.peer, - }, + clonedRegion := new(metapb.Region) + err := util.CloneMsg(region, clonedRegion) + if err != nil { + return + } + d.ctx.schedulerTaskSender <- &runner.SchedulerRegionHeartbeatTask{ + Region: clonedRegion, + Peer: peer.Meta, + PendingPeers: peer.CollectPendingPeers(), + ApproximateSize: peer.ApproximateSize, } } @@ -232,47 +173,83 @@ func (d *peerMsgHandler) createNewSplitRegion(split *raft_cmdpb.SplitRequest, ol // processConfChange 处理配置变更日志 func (d *peerMsgHandler) processConfChange(entry *pb.Entry, cc *pb.ConfChange, kvWB *engine_util.WriteBatch) *engine_util.WriteBatch { - //1. 解析配置变更日志 - var resp *raft_cmdpb.RaftCmdResponse - var err error - var ccData []byte - if cc.NodeId == 0 { - // NodeId 为 0 说明是一个 ConfChangeV2 类型的配置变更日志 - ccV2 := &pb.ConfChangeV2{} - if err := ccV2.Unmarshal(cc.Context); err != nil { - log.Panic(err) - } - ccData = ccV2.ChangeType.String() - } else { - ccData = cc.ChangeType.String() + // 获取 ConfChange Command Request + msg := &raft_cmdpb.RaftCmdRequest{} + if err := msg.Unmarshal(cc.Context); err != nil { + log.Panic(err) + } + region := d.Region() + changePeerReq := msg.AdminRequest.ChangePeer + // 检查 Command Request 中的 RegionEpoch 是否是过期的,以此判定是不是一个重复的请求 + // 实验指导书中提到,测试程序可能会多次提交同一个 ConfChange 直到 ConfChange 被应用 + // CheckRegionEpoch 检查 RaftCmdRequest 头部携带的 RegionEpoch 是不是和 currentRegionEpoch 匹配 + if err, ok := util.CheckRegionEpoch(msg, region, true).(*util.ErrEpochNotMatch); ok { + log.Infof("[processConfChange] %v RegionEpoch not match", d.PeerId()) + d.handleProposal(entry, ErrResp(err)) + return kvWB } - //2. 处理配置变更日志 switch cc.ChangeType { - case pb.ConfChangeType_AddNode: - // 添加节点 - resp, err = d.processAddNode(cc, ccData) - case pb.ConfChangeType_RemoveNode: - // 删除节点 - resp, err = d.processRemoveNode(cc, ccData) - default: - log.Panic("unexpected conf type") + case pb.ConfChangeType_AddNode: // 添加一个节点 + log.Infof("[AddNode] %v add %v", d.PeerId(), cc.NodeId) + // 待添加的节点必须原先在 Region 中不存在 + if d.searchPeerWithId(cc.NodeId) == len(region.Peers) { + // region 中追加新的 peer + region.Peers = append(region.Peers, changePeerReq.Peer) + region.RegionEpoch.ConfVer++ + meta.WriteRegionState(kvWB, region, rspb.PeerState_Normal) // PeerState 用来表示当前 Peer 是否在 region 中 + // 更新 metaStore 中的 region 信息 + d.updateStoreMeta(region) + // 更新 peerCache,peerCache 保存了 peerId -> Peer 的映射 + // 当前 raft_store 上的 peer 需要发送消息给同一个 region 中的别的节点的时候,需要获取别的节点所在 storeId + // peerCache 里面就保存了属于同一个 region 的所有 peer 的元信息(peerId, storeId) + d.insertPeerCache(changePeerReq.Peer) + } + case pb.ConfChangeType_RemoveNode: // 删除一个节点 + log.Infof("[RemoveNode] %v remove %v", d.PeerId(), cc.NodeId) + // 如果目标节点是自身,那么直接销毁并返回:从 raft_store 上删除所属 region 的所有信息 + if cc.NodeId == d.PeerId() { + d.destroyPeer() + log.Infof("[RemoveNode] destory %v compeleted", cc.NodeId) + return kvWB + } + // 待删除的节点必须存在于 region 中 + n := d.searchPeerWithId(cc.NodeId) + if n != len(region.Peers) { + // 删除节点 RaftGroup 中的第 n 个 peer(注意,这里并不是编号为 n 的 peer,而是第 n 个 peer) + region.Peers = append(region.Peers[:n], region.Peers[n+1:]...) + region.RegionEpoch.ConfVer++ + meta.WriteRegionState(kvWB, region, rspb.PeerState_Normal) // PeerState 用来表示当前 Peer 是否在 region 中 + // 更新 metaStore 中的 region 信息 + d.updateStoreMeta(region) + // 更新 peerCache + d.removePeerCache(cc.NodeId) + } } - if err != nil { - resp = ErrResp(err) + // 更新 raft 层的配置信息 + d.RaftGroup.ApplyConfChange(*cc) + // 处理 proposal + d.handleProposal(entry, &raft_cmdpb.RaftCmdResponse{ + Header: &raft_cmdpb.RaftResponseHeader{}, + AdminResponse: &raft_cmdpb.AdminResponse{ + CmdType: raft_cmdpb.AdminCmdType_ChangePeer, + ChangePeer: &raft_cmdpb.ChangePeerResponse{Region: region}, + }, + }) + // 新增加的 peer 是通过 leader 的心跳完成的 + if d.IsLeader() { + d.HeartbeatScheduler(d.ctx.schedulerTaskSender) } - d.handleProposal(entry, resp) return kvWB } // searchPeerWithId 根据需要添加或者删除的 Peer id,找到 region 中是否已经存在这个 Peer func (d *peerMsgHandler) searchPeerWithId(nodeId uint64) int { - region := d.Region() - for i, peer := range region.Peers { + for id, peer := range d.peerStorage.region.Peers { if peer.Id == nodeId { - return i + return id } } - return len(region.Peers) + return len(d.peerStorage.region.Peers) } func (d *peerMsgHandler) updateStoreMeta(region *metapb.Region) { storeMeta := d.ctx.storeMeta @@ -351,15 +328,25 @@ func (d *peerMsgHandler) handleProposal(entry *pb.Entry, resp *raft_cmdpb.RaftCm // 其他情况:正确匹配的 proposal(处理完毕之后应该立即结束),further proposal(直接返回) for len(d.proposals) > 0 { proposal := d.proposals[0] - if proposal.index < entry.Index { - // 过期的 proposal - d.handleProposalStale(entry, proposal) - } else if proposal.index == entry.Index { - // 正确匹配的 proposal + // proposal.index < entry.index 是有可能出现的 + // 如果 leader 宕机了并且有一个新的 leader 向它发送了快照,当应用了快照之后又继续同步了新的日志并 commit 了 + // 这个时候 proposal.index < entry.index + if proposal.term < entry.Term || proposal.index < entry.Index { + // 日志被截断的情况 + NotifyStaleReq(proposal.term, proposal.cb) + d.proposals = d.proposals[1:] + continue + } + // 正常匹配 + if proposal.term == entry.Term && proposal.index == entry.Index { + if proposal.cb != nil { + proposal.cb.Txn = d.peerStorage.Engines.Kv.NewTransaction(false) // snap resp should set txn explicitly + } proposal.cb.Done(resp) d.proposals = d.proposals[1:] - return } + // further proposal(即当前的 entry 并没有 proposal 在等待,或许是因为现在是 follower 在处理 committed entry) + return } } @@ -450,9 +437,6 @@ func (d *peerMsgHandler) proposeAdminRequest(msg *raft_cmdpb.RaftCmdRequest, cb if err := d.RaftGroup.Propose(data); err != nil { log.Panic(err) } - case raft_cmdpb.AdminCmdType_TransferLeader: - case raft_cmdpb.AdminCmdType_ChangePeer: - case raft_cmdpb.AdminCmdType_Split: } } diff --git a/kv/raftstore/peer_storage.go b/kv/raftstore/peer_storage.go index 1b26efa..3957c60 100644 --- a/kv/raftstore/peer_storage.go +++ b/kv/raftstore/peer_storage.go @@ -256,17 +256,14 @@ func (ps *PeerStorage) clearMeta(kvWB, raftWB *engine_util.WriteBatch) error { // Delete all data that is not covered by `new_region`. func (ps *PeerStorage) clearExtraData(newRegion *metapb.Region) { - // Your Code Here (2C). - // 1. 获取当前 region 的 startKey 和 endKey - startKey, endKey := ps.region.StartKey, ps.region.EndKey - // 2. 获取新 region 的 startKey 和 endKey - newStartKey, newEndKey := newRegion.StartKey, newRegion.EndKey - // 3. 判断新 region 是否和当前 region 一样,如果一样则不需要删除 - if bytes.Equal(startKey, newStartKey) && bytes.Equal(endKey, newEndKey) { - return - } - // 4. 删除当前 region 的数据 - ps.clearRange(ps.region.Id, startKey, endKey) + oldStartKey, oldEndKey := ps.region.GetStartKey(), ps.region.GetEndKey() + newStartKey, newEndKey := newRegion.GetStartKey(), newRegion.GetEndKey() + if bytes.Compare(oldStartKey, newStartKey) < 0 { + ps.clearRange(newRegion.Id, oldStartKey, newStartKey) + } + if bytes.Compare(newEndKey, oldEndKey) < 0 || (len(oldEndKey) == 0 && len(newEndKey) != 0) { + ps.clearRange(newRegion.Id, newEndKey, oldEndKey) + } } // ClearMeta delete stale metadata like raftState, applyState, regionState and raft log entries diff --git a/raft/raft.go b/raft/raft.go index e2f9a08..5cd1d1c 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -330,13 +330,6 @@ func (r *Raft) leaderTick() { // MessageType_MsgBeat 属于内部消息,不需要经过 RawNode 处理 r.Step(pb.Message{From: r.id, To: r.id, MsgType: pb.MessageType_MsgBeat}) } - //TODO 选举超时 判断心跳回应数量 - - //TODO 3A 禅让机制 - if r.leadTransferee != None { - // 在选举超时后领导权禅让仍然未完成,则 leader 应该终止领导权禅让,这样可以恢复客户端请求 - r.transferElapsed++ - } } func (r *Raft) candidateTick() { @@ -400,12 +393,6 @@ func (r *Raft) becomeLeader() { r.Prs[id].Next = r.RaftLog.LastIndex() + 1 // 初始化为 leader 的最后一条日志索引(后续出现冲突会往前移动) r.Prs[id].Match = 0 // 初始化为 0 就可以了 } - //3A - r.PendingConfIndex = r.initPendingConfIndex() - // 成为 Leader 之后立马在日志中追加一条 noop 日志,这是因为 - // 在 Raft 论文中提交 Leader 永远不会通过计算副本的方式提交一个之前任期、并且已经被复制到大多数节点的日志 - // 通过追加一条当前任期的 noop 日志,可以快速的提交之前任期内所有被复制到大多数节点的日志 - r.Step(pb.Message{MsgType: pb.MessageType_MsgPropose, Entries: []*pb.Entry{{}}}) } // Step the entrance of handle message, see `MessageType` @@ -632,7 +619,21 @@ func (r *Raft) handleRequestVote(m pb.Message) { } //1. 判断 Msg 的 Term 是否大于等于自己的 Term,是则更新 if m.Term > r.Term { - r.becomeFollower(m.Term, m.From) + // term比自己大 变成follower + r.becomeFollower(m.Term, None) + } + if (m.Term > r.Term || m.Term == r.Term && (r.Vote == None || r.Vote == m.From)) && r.RaftLog.isUpToDate(m.Index, m.LogTerm) { + // 投票 + // 1. Candidate 任期大于自己并且日志足够新 + // 2. Candidate 任期和自己相等并且自己在当前任期内没有投过票或者已经投给了 Candidate,并且 Candidate 的日志足够新 + r.becomeFollower(m.Term, None) + r.Vote = m.From + } else { + // 拒绝投票 + // 1. Candidate 的任期小于自己 + // 2. 自己在当前任期已经投过票了 + // 3. Candidate 的日志不够新 + voteRep.Reject = true } r.msgs = append(r.msgs, voteRep) } @@ -674,8 +675,25 @@ func (r *Raft) handleAppendEntries(m pb.Message) { prevLogTerm := m.LogTerm r.becomeFollower(m.Term, m.From) if prevLogIndex > r.RaftLog.LastIndex() || r.RaftLog.TermNoErr(prevLogIndex) != prevLogTerm { - r.msgs = append(r.msgs, appendEntryResp) - return + //最后一条index和preLogIndex冲突、或者任期冲突 + //这时不能直接将 leader 传递过来的 Entries 覆盖到 follower 日志上 + //这里可以直接返回,以便让 leader 尝试 prevLogIndex - 1 这条日志 + //但是这样 follower 和 leader 之间的同步比较慢 + //TODO 日志冲突优化: + // 找到冲突任期的第一条日志,下次 leader 发送 AppendEntry 的时候会将 nextIndex 设置为 ConflictIndex + // 如果找不到的话就设置为 prevLogIndex 的前一个 + appendEntryResp.Index = r.RaftLog.LastIndex() + //appendEntryResp.Index = prevLogIndex - 1 // 用于提示 leader prevLogIndex 的开始位置是appendEntryResp.Index + if prevLogIndex <= r.RaftLog.LastIndex() { + conflictTerm := r.RaftLog.TermNoErr(prevLogIndex) + for _, ent := range r.RaftLog.entries { + if ent.Term == conflictTerm { + //找到冲突任期的上一个任期的idx位置 + appendEntryResp.Index = ent.Index - 1 + break + } + } + } } else { //prevLogIndex没有冲突 if len(m.Entries) > 0 { @@ -732,12 +750,6 @@ func (r *Raft) handleAppendEntriesResponse(m pb.Message) { r.broadcastAppendEntry() } } - //3A - if r.leadTransferee == m.From && r.Prs[m.From].Match == r.RaftLog.LastIndex() { - // AppendEntryResponse 回复来自 leadTransferee,检查日志是否是最新的 - // 如果 leadTransferee 达到了最新的日志则立即发起领导权禅让 - r.sendTimeoutNow(m.From) - } } // maybeUpdate 检查日志同步是不是一个过期的回复 @@ -897,10 +909,6 @@ func (r *Raft) handleHeartbeatResponse(m pb.Message) { // handlePropose 追加从上层应用接收到的新日志,并广播给 follower func (r *Raft) handlePropose(m pb.Message) { r.appendEntry(m.Entries) - // leader 处于领导权禅让,停止接收新的请求 - if r.leadTransferee != None { - return - } r.Prs[r.id].Match = r.RaftLog.LastIndex() r.Prs[r.id].Next = r.RaftLog.LastIndex() + 1 if len(r.Prs) == 1 {