Skip to content

Commit dc9cddc

Browse files
committed
fix: common scenario workflow in raft;
follower candidate and leader state transition;
1 parent e6ce98e commit dc9cddc

File tree

2 files changed

+133
-0
lines changed

2 files changed

+133
-0
lines changed

src/exercise/raft-in-go/part1/raft.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,19 @@ rafe有两种类别的rpc请求:
3939
1. 将状态转换为candidate,并将当前任期数自增(表示新一轮投票)
4040
2. 发送RV给所有的peers节点,并请求为自己投票
4141
3. 等待这些RPC的响应,并统计投票数,如果数量足够了就变成leader
42+
43+
成为leader后,需要向其它节点同步信息,发送RPC请求
44+
45+
状态转移和协程
46+
总结一下cm状态的转移过程和对应转移所处的goroutine:
47+
Follower:CM被初始化为一个follower,每次调用becomeFollower时都会新起goroutine运行runElectionTimer。注意,在短时间内允许有多个同时运行。假设一个follower从更高任期的leader接收到一个RV,就会触发becomeFollower调用,就会启动新的goroutine定时器。但是旧的routine计时器会因为任期不是最新从而退出。
48+
49+
Candidate:候选人也有并行的选举goroutine,但除此之外,它还有许多goroutine来发送rpc。它同样也有follower那样的机制,根据任期大小来停止旧的选举。请记住,RPC goroutines可能需要很长的时间才能完成,所以如果它们发现在RPC调用返回时已经过期,就必须安静地退出。
50+
51+
Leader:leader 不会发出选举goroutine,但是会以每50毫秒来发送心跳
52+
53+
非正常选举场景
54+
4255
*/
4356

4457
type ConsensusModule struct {
@@ -57,6 +70,15 @@ type ConsensusModule struct {
5770
electionResetEvent time.Time
5871
}
5972

73+
type AppendEntriesArgs struct {
74+
Term int
75+
LeaderId int
76+
}
77+
type AppendEntriesReply struct {
78+
Term int
79+
Success bool
80+
}
81+
6082
type LogEntry struct {
6183
Command interface{}
6284
Term int
@@ -207,6 +229,113 @@ func (cm *ConsensusModule) becomeFollower(term int) {
207229
go cm.runElectionTimer()
208230
}
209231

232+
/*
233+
赢得大多数节点的选票即可称为leader
234+
每50ms发送一个心跳请求
235+
*/
210236
func (cm *ConsensusModule) startLeader() {
237+
cm.state = Leader
238+
cm.dlog("becomes Leader; term=%d, log=%v", cm.currentTerm, cm.log)
239+
240+
go func() {
241+
ticker := time.NewTicker(50 * time.Millisecond)
242+
defer ticker.Stop()
243+
// 作为leader,需要一直发送心跳
244+
for {
245+
cm.leaderSendHeartbeats()
246+
<-ticker.C
247+
248+
cm.mu.Lock()
249+
if cm.state != Leader {
250+
cm.mu.Unlock()
251+
return
252+
}
253+
cm.mu.Unlock()
254+
}
255+
}()
256+
}
257+
258+
func (cm *ConsensusModule) leaderSendHeartbeats() {
259+
cm.mu.Lock()
260+
savedCurrentTerm := cm.currentTerm
261+
cm.mu.Unlock()
262+
263+
for _, peerId := range cm.peerIds {
264+
args := AppendEntriesArgs{
265+
Term: savedCurrentTerm,
266+
LeaderId: cm.id,
267+
}
268+
269+
go func(peerId int) {
270+
cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, 0, args)
271+
var reply AppendEntriesReply
272+
if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err != nil {
273+
cm.mu.Lock()
274+
defer cm.mu.Unlock()
275+
if reply.Term > savedCurrentTerm {
276+
cm.dlog("term out of date in heartbeat reply")
277+
cm.becomeFollower(reply.Term)
278+
return
279+
}
280+
}
281+
}(peerId)
282+
}
283+
}
284+
285+
func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
286+
cm.mu.Lock()
287+
defer cm.mu.Unlock()
288+
if cm.state == Dead {
289+
return nil
290+
}
291+
cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d]", args, cm.currentTerm, cm.votedFor)
211292

293+
if args.Term > cm.currentTerm {
294+
// 请求参数的任期比该cm大,说明该投票已过期
295+
cm.dlog("... term out of date in RequestVote")
296+
cm.becomeFollower(args.Term)
297+
}
298+
// 调用者的任期和请求投票的任期一致并且该调用者还没有投票给其它节点请求,则投票成功
299+
if cm.currentTerm == args.Term && (cm.votedFor == -1 || cm.votedFor == args.CandidateId) {
300+
reply.VoteGranted = true
301+
cm.votedFor = args.CandidateId
302+
cm.electionResetEvent = time.Now()
303+
} else {
304+
reply.VoteGranted = false
305+
}
306+
reply.Term = cm.currentTerm
307+
cm.dlog("... RequestVote reply: %+v", reply)
308+
return nil
309+
310+
}
311+
312+
func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
313+
cm.mu.Lock()
314+
defer cm.mu.Unlock()
315+
if cm.state == Dead {
316+
return nil
317+
}
318+
cm.dlog("AppendEntries: %+v", args)
319+
320+
if args.Term > cm.currentTerm {
321+
cm.dlog("... term out of date in AppendEntries")
322+
cm.becomeFollower(args.Term)
323+
}
324+
325+
reply.Success = false
326+
/*
327+
这里在什么情况下会存在一个leader(该cm)节点会成为另一个leader的follower节点?
328+
Raft保证在任何给定的任期内只存在一个领导者。如果您仔细地遵循RequestVote的逻辑和startElection中发送rv的代码,您将看到集群中不可能在同一任期内存在两个leader。这个条件对于发现另一个同伴在这届选举中获胜的候选人来说是很重要的。
329+
这里应该指的是短时间leader失去连接,导致脑裂,从而出现的问题
330+
*/
331+
if args.Term == cm.currentTerm {
332+
if cm.state != Follower {
333+
cm.becomeFollower(args.Term)
334+
}
335+
cm.electionResetEvent = time.Now()
336+
reply.Success = true
337+
}
338+
reply.Term = cm.currentTerm
339+
cm.dlog("AppendEntries reply: %+v", *reply)
340+
return nil
212341
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
11
package main
22

33
type Server struct{}
4+
5+
func (s *Server) Call(peerId int, msg string, args interface{}, reply interface{}) error {
6+
return nil
7+
}

0 commit comments

Comments
 (0)