Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Legendout committed Aug 11, 2024
1 parent 7a70f61 commit cf8de33
Showing 1 changed file with 79 additions and 2 deletions.
81 changes: 79 additions & 2 deletions kv/raftstore/peer_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,29 @@ func (d *peerMsgHandler) processAdminRequest(entry *pb.Entry, requests *raft_cmd

// notifyHeartbeatScheduler 帮助 region 快速创建 peer
func (d *peerMsgHandler) notifyHeartbeatScheduler(region *metapb.Region, peer *peer) {
return
return
}
d.ctx.schedulerTaskSender <- &runner.SchedulerRegionHeartbeatTask{

Check failure on line 150 in kv/raftstore/peer_msg_handler.go

View workflow job for this annotation

GitHub Actions / Build & Test

syntax error: non-declaration statement outside function body
Region: clonedRegion,
Peer: peer.Meta,
PendingPeers: peer.CollectPendingPeers(),
ApproximateSize: peer.ApproximateSize,
}
}

func (d *peerMsgHandler) createNewSplitRegion(split *raft_cmdpb.SplitRequest, oldRegion *metapb.Region) *metapb.Region {
return nil
newPeers := make([]*metapb.Peer, 0)
for i, peer := range oldRegion.Peers {
newPeers = append(newPeers, &metapb.Peer{Id: split.NewPeerIds[i], StoreId: peer.StoreId})
}
newRegion := &metapb.Region{
Id: split.NewRegionId,
StartKey: split.SplitKey,
EndKey: oldRegion.EndKey,
Peers: newPeers, // Region 中每个 Peer 的 id 以及所在的 storeId
RegionEpoch: &metapb.RegionEpoch{Version: InitEpochVer, ConfVer: InitEpochConfVer},
}
return newRegion
}

// processConfChange 处理配置变更日志
Expand Down Expand Up @@ -232,6 +250,7 @@ func (d *peerMsgHandler) searchPeerWithId(nodeId uint64) int {
}
return len(d.peerStorage.region.Peers)
}

func (d *peerMsgHandler) updateStoreMeta(region *metapb.Region) {
storeMeta := d.ctx.storeMeta
storeMeta.Lock()
Expand Down Expand Up @@ -418,6 +437,64 @@ func (d *peerMsgHandler) proposeAdminRequest(msg *raft_cmdpb.RaftCmdRequest, cb
if err := d.RaftGroup.Propose(data); err != nil {
log.Panic(err)
}
case raft_cmdpb.AdminCmdType_TransferLeader: // 领导权禅让直接执行,不需要提交到 raft
// 执行领导权禅让
d.RaftGroup.TransferLeader(msg.AdminRequest.TransferLeader.Peer.Id)
// 返回 response
adminResp := &raft_cmdpb.AdminResponse{
CmdType: raft_cmdpb.AdminCmdType_TransferLeader,
TransferLeader: &raft_cmdpb.TransferLeaderResponse{},
}
cb.Done(&raft_cmdpb.RaftCmdResponse{
Header: &raft_cmdpb.RaftResponseHeader{},
AdminResponse: adminResp,
})
case raft_cmdpb.AdminCmdType_ChangePeer: // 集群成员变更,需要提交到 raft,并处理 proposal 回调
// 单步成员变更:前一步成员变更被提交之后才可以执行下一步成员变更
if d.peerStorage.AppliedIndex() >= d.RaftGroup.Raft.PendingConfIndex {
// 如果 region 只有两个节点,并且需要 remove leader,则需要先完成 transferLeader
if len(d.Region().Peers) == 2 && msg.AdminRequest.ChangePeer.ChangeType == pb.ConfChangeType_RemoveNode && msg.AdminRequest.ChangePeer.Peer.Id == d.PeerId() {
for _, p := range d.Region().Peers {
if p.Id != d.PeerId() {
d.RaftGroup.TransferLeader(p.Id)
break
}
}
}
// 1. 创建 proposal
d.proposals = append(d.proposals, &proposal{
index: d.nextProposalIndex(),
term: d.Term(),
cb: cb,
})
// 2. 提交到 raft
context, _ := msg.Marshal()
d.RaftGroup.ProposeConfChange(pb.ConfChange{
ChangeType: msg.AdminRequest.ChangePeer.ChangeType, // 变更类型
NodeId: msg.AdminRequest.ChangePeer.Peer.Id, // 变更成员 id
Context: context, // request data
})
}
case raft_cmdpb.AdminCmdType_Split: // Region 分裂
// 如果收到的 Region Split 请求是一条过期的请求,则不应该提交到 Raft
if err := util.CheckRegionEpoch(msg, d.Region(), true); err != nil {
log.Infof("[AdminCmdType_Split] Region %v Split, a expired request", d.Region())
cb.Done(ErrResp(err))
return
}
if err := util.CheckKeyInRegion(msg.AdminRequest.Split.SplitKey, d.Region()); err != nil {
cb.Done(ErrResp(err))
return
}
log.Infof("[AdminCmdType_Split Propose] Region %v Split, entryIndex %v", d.Region(), d.nextProposalIndex())
// 否则的话 Region 还没有开始分裂,则将请求提交到 Raft
d.proposals = append(d.proposals, &proposal{
index: d.nextProposalIndex(),
term: d.Term(),
cb: cb,
})
data, _ := msg.Marshal()
d.RaftGroup.Propose(data)
}
}

Expand Down

0 comments on commit cf8de33

Please sign in to comment.