From 3be877261e98c7185cf39afc2a5699eedecc26e8 Mon Sep 17 00:00:00 2001 From: codico Date: Sun, 17 Jul 2022 15:59:47 +0200 Subject: [PATCH] Figure8Unreliable2C 300/300 passing --- src/raft/config.go | 2 + src/raft/raft.go | 146 ++++++++++++++++++++++++--------------------- src/raft/tester.sh | 5 +- src/raft/util.go | 7 +++ 4 files changed, 89 insertions(+), 71 deletions(-) diff --git a/src/raft/config.go b/src/raft/config.go index e3852e1..927340b 100644 --- a/src/raft/config.go +++ b/src/raft/config.go @@ -174,6 +174,7 @@ func (cfg *config) applier(i int, applyCh chan ApplyMsg) { err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex) } if err_msg != "" { + DPrintf("apply error: %v", err_msg) DPrintClose() log.Fatalf("apply error: %v", err_msg) cfg.applyErr[i] = err_msg @@ -264,6 +265,7 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) { // Ignore other types of ApplyMsg. } if err_msg != "" { + DPrintf("apply error: %v", err_msg) DPrintClose() log.Fatalf("apply error: %v", err_msg) cfg.applyErr[i] = err_msg diff --git a/src/raft/raft.go b/src/raft/raft.go index 1125979..a6d8dbf 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -48,14 +48,6 @@ type Raft struct { matchCommandIds []int // for each server, index of highest log entry known to be replicated on server (initialized to 0, increases monotonically) } -func (rf *Raft) printLastLog(fnName string) { - if len(rf.log) > 0 { - entry := rf.log[len(rf.log) - 1] - DPrintf("[%s.%d.%d] Last Command In Log [%d|%d]=%d", fnName, rf.currentTerm, rf.me, entry.CommandIndex, entry.Term, entry.Command) - } -} - - func (rf *Raft) getLastLogEntry() (*ApplyMsg) { if len(rf.log) == 0 { return nil @@ -66,11 +58,8 @@ func (rf *Raft) getLastLogEntry() (*ApplyMsg) { func (rf *Raft) GetState() (int, bool) { rf.mu.Lock() defer rf.mu.Unlock() - var term int - var isleader bool - term = rf.currentTerm - isleader = rf.leader == &rf.me - return term, isleader + isLeader := rf.leader == &rf.me + return rf.currentTerm, isLeader } type Role int @@ -111,16 +100,16 @@ func (rf *Raft) readPersist(data []byte) { var votedFor *int var currentTerm int if e := d.Decode(&log); e != nil { - println(e) + DPrintf(e.Error()) panic("Fail decode log") } if e := d.Decode(&votedFor); e != nil { - println(e) + DPrintf(e.Error()) panic("Fail decode votedFor") } if e := d.Decode(¤tTerm); e != nil { - println("Fail decode currentTerm", e.Error()) - println(rf.currentTerm) + DPrintf("Fail decode currentTerm %s", e.Error()) + DPrintf("%d", rf.currentTerm) currentTerm = 0 } rf.log = log @@ -174,37 +163,56 @@ func (rf *Raft) grantVote(args *RequestVoteArgs, reply *RequestVoteReply) { rf.role = Follower rf.votedFor = &args.CandidateId reply.VoteGranted = true - rf.persist() + rf.persist() } +func (rf *Raft) checkAlreadyVotedInTerm(args *RequestVoteArgs) bool { + return args.Term == rf.currentTerm && rf.votedFor != nil +} + +func (rf *Raft) checkIsLowTerm(args *RequestVoteArgs) bool { + return args.Term < rf.currentTerm +} func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { rf.mu.Lock() defer rf.mu.Unlock() + // Reply false if term < currentTerm (§5.1) - if args.Term < rf.currentTerm { + if rf.checkIsLowTerm(args) { DPrintf("Voting no due to low candiate term %d, my term %d", args.Term, rf.currentTerm) reply.VoteNo(rf.currentTerm) return } - if args.Term == rf.currentTerm { - if rf.votedFor != nil { - // NOTE: vote no if we did not vote in this round + // handle the current term + if rf.checkAlreadyVotedInTerm(args) { + // vote no if we already voted in this term + DPrintf("Voting no because me=%d already voted in term %d, my term %d", rf.me, args.Term, rf.currentTerm) reply.VoteNo(rf.currentTerm) return - } - } else { - rf.currentTerm = args.Term - rf.votedFor = nil - rf.persist() } + // the term has increased, or we did not vote in this term + // if I did not vote in this term, I should not be the leader + // so either way, we should step down from leadership here + // make sure we step down from leadership if we see a higher term in the voting process + // we used to not do this, and it's possible that this was the cause of a bug + // TODO: use rf.stepDown + if rf.role == Leader { + fmt.Printf("[RequestVote.%d.%d]: I used to not step down here. This caused all the issues? (When I also voted no)\n", rf.currentTerm, rf.me) + } + rf.currentTerm = args.Term + rf.votedFor = nil + rf.role = Follower + rf.persist() + logLength := len(rf.log) - // if there is nothing in the log the voters log can't be more up to date + // if there is nothing in the log my log can't be more up to date + // lets just say yes if logLength == 0 { - DPrintf("Voting yes because nothing my log so candidat egotta be up to date") + DPrintf("Voting yes because nothing my log so candidate gotta be up to date") rf.grantVote(args, reply) return } @@ -273,22 +281,12 @@ type RequestAppendEntriesReply struct { // commit commandID is not persisted I think, so we might override commited entries by doing this? func (rf *Raft) calculateConflictInfo(args *RequestAppendEntriesArgs, reply *RequestAppendEntriesReply) { rf.printLastLog("calculateConflictInfo") - // TODO: Conflicting index should be the first index of that specific problematic term - if len(rf.log) == 0 { reply.LastLogIndex = -1 return } lastLogIndex := len(rf.log) - 1 - - // we probably don't need this part - // lastLog := rf.log[lastLogIndex] - // if args.PrevLogIndex > lastLogIndex && lastLog.Term != args.PrevLogTerm { - // reply.LastLogIndex = lastLogIndex - // return - // } - conflictIdx := min(args.PrevLogIndex, lastLogIndex) conflictTerm := rf.log[conflictIdx].Term @@ -354,16 +352,16 @@ func (rf *Raft) decrementNextIndex(server int, args *RequestAppendEntriesArgs, r decrementedIdx = 0 } if rf.nextIndex[server] <= decrementedIdx { - fmt.Printf("nextIndex=%d decrementedNextIdx=%d\n", rf.nextIndex[server] , decrementedIdx) + DPrintf("nextIndex=%d decrementedNextIdx=%d\n", rf.nextIndex[server] , decrementedIdx) // if this situation happens, this node CANNOT accept my messages? // should i consider stepping down here if rf.commitCommandTerm != rf.currentTerm { // I never ever commited any commands, so its probably just time to step down - fmt.Printf("The server REFUSES to replicate, but I never committed anything so im stepping down. nextIndex=%d decrementedNextIdx=%d\n", rf.nextIndex[server] , decrementedIdx) + DPrintf("The server REFUSES to replicate, but I never committed anything so im stepping down. nextIndex=%d decrementedNextIdx=%d\n", rf.nextIndex[server] , decrementedIdx) rf.role = Follower return } else { - fmt.Printf("The server REFUSES to replicate, but I have committed... monkaHmmm nextIndex=%d decrementedNextIdx=%d\n", rf.nextIndex[server] , decrementedIdx) + DPrintf("The server REFUSES to replicate, but I have committed... monkaHmmm nextIndex=%d decrementedNextIdx=%d\n", rf.nextIndex[server] , decrementedIdx) } } rf.nextIndex[server] = decrementedIdx @@ -412,6 +410,7 @@ func (rf *Raft) getServerByID(server int)*labrpc.ClientEnd { func (rf *Raft) stepDown(reply *RequestAppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() + DPrintf("[stepDown.%d.%d] Stepping down because I've seen a higher term than my current term.", rf.currentTerm, rf.me) rf.currentTerm = reply.Term rf.votedFor = nil rf.role = Follower @@ -429,19 +428,20 @@ func (rf *Raft) sendAppendEntries( if !reply.Success { - if reply.Term > rf.currentTerm { + if term, _ := rf.GetState(); reply.Term > term { + // step down as a leader if the replying server is in a higher term rf.stepDown(reply) - } else { - rf.decrementNextIndex(server, args, reply) + return } + rf.decrementNextIndex(server, args, reply) return } - if rf.role != Leader { - if args.hasEntries() { - fmt.Println("RECEIVED A REPLICATED LOG WITHOUT BEING LEADER ANYMORE. Should i break?") - fmt.Println(rf.commitCommandTerm, rf.currentTerm) - } + // If we are not the leader any longer, we should not continue replicating any logs + if !rf.isLeader() { + rf.mu.Lock() + defer rf.mu.Unlock() + DPrintf("[sendAppendEntries.%d.%d] Received a replicated log without being leader any longer.", rf.currentTerm, rf.me) return } @@ -451,9 +451,9 @@ func (rf *Raft) sendAppendEntries( recentCommandID := recentEntry.CommandIndex rf.updateLastAppended(server, recentCommandID) - // NOTE: update the commit idx + // update the commit idx if rf.checkCommitted(recentCommandID, recentCommandTerm) { - // NOTE: send freshly commited entries to the applyCh + // send freshly commited entries to the service throught the applyCh rf.sendEntries(recentCommandID) rf.storeCommitIdx(recentCommandID, recentCommandTerm) } @@ -512,6 +512,7 @@ func (rf *Raft) sendCommittedEntries(commitID int) { break } msg := rf.log[index] + DPrintf("[sendCommittedEntries.%d.%d] Sending Log Entry [%d|%d]=%d", rf.currentTerm, rf.me, msg.CommandIndex, msg.Term, msg.Command) rf.applyCh <- msg rf.lastAppliedIndex = msg.CommandIndex - 1 } @@ -521,12 +522,16 @@ func (rf *Raft) appendNewEntries(args *RequestAppendEntriesArgs) { // 3. If an existing entry conflicts with a new one (same index // but different terms), delete the existing entry and all that // follow it (§5.3) + // we have determined that the PrevLogIndex is correct here so lets split splitIdx := args.PrevLogIndex + 1 rf.log = rf.log[:splitIdx] - rf.printLastLog("appendNewEntries") + rf.printLastLog("appendNewEntries(preMerge)") DPrintf("[appendNewEntries.%d.%d] Current CommitID %d \n", rf.currentTerm, rf.me, rf.commitCommandID) + entry := args.Entries[0] + DPrintf("[appendNewEntries.%d.%d] First Appended Log [%d|%d]=%d", rf.currentTerm, rf.me, entry.CommandIndex, entry.Term, entry.Command) // 4. Append any new entries not already in the log rf.log = append(rf.log, args.Entries...) + rf.printLastLog("appendNewEntries(postMerge)") } @@ -538,26 +543,33 @@ func (rf *Raft) AppendEntries( defer rf.mu.Unlock() // 1. Reply false if term < currentTerm (§5.1) if rf.currentTerm > args.Term { - DPrintf("%d Not accepting log due to low term %d myTerm=%d", rf.me, args.Term, rf.currentTerm) + DPrintf("me=%d Not accepting log due to low args.Term=%d myTerm=%d", rf.me, args.Term, rf.currentTerm) rf.denyAppendEntry(args, reply) return } + if args.Term > rf.currentTerm { + rf.currentTerm = args.Term + rf.votedFor = nil + rf.role = Follower + rf.persist() + } + // 2. Reply false if log doesn’t contain an entry at prevLogIndex // whose term matches prevLogTerm (§5.3) if args.PrevLogIndex >= 0 { lastLogIndex := len(rf.log) - 1 if lastLogIndex < args.PrevLogIndex { - DPrintf("%d Not accepting log due to too high prev log index. loglen %d, prev index %d, leader commit ID %d", rf.me, len(rf.log), args.PrevLogIndex, args.LeaderCommitID) - DPrintf("%d My commit ID: %d", rf.me, rf.commitCommandID) + DPrintf("me=%d Not accepting log due to too high prev log index. loglen %d, prev index %d, leader commit ID %d", rf.me, len(rf.log), args.PrevLogIndex, args.LeaderCommitID) + DPrintf("me=%d My commit ID: %d", rf.me, rf.commitCommandID) rf.calculateConflictInfo(args, reply) rf.denyAppendEntry(args, reply) return } if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm { - DPrintf("%d Not accepting log due to incorrect PrevLogTerm my term %d, prevLogTerm %d", rf.me, rf.log[args.PrevLogIndex].Term, args.PrevLogTerm) - DPrintf("%d My commit ID: %d", rf.me, rf.commitCommandID) + DPrintf("me=%d Not accepting log due to incorrect PrevLogTerm my term %d, prevLogTerm %d", rf.me, rf.log[args.PrevLogIndex].Term, args.PrevLogTerm) + DPrintf("me=%d My commit ID: %d", rf.me, rf.commitCommandID) rf.calculateConflictInfo(args, reply) rf.denyAppendEntry(args, reply) return @@ -580,7 +592,7 @@ func (rf *Raft) AppendEntries( reply.Success = true if args.hasEntries() { - DPrintf("[appendNewEntries.%d.%d] ", rf.currentTerm, rf.me) + DPrintf("[appendNewEntries.%d.%d]", rf.currentTerm, rf.me) rf.appendNewEntries(args) } @@ -588,14 +600,10 @@ func (rf *Raft) AppendEntries( // if raft crashes after sending to the service but not persisting the new log, we will have an undefined state rf.role = Follower rf.leader = &args.Leader - if args.Term > rf.currentTerm { - rf.currentTerm = args.Term - rf.votedFor = nil - } rf.lastAppliedTime = time.Now() rf.persist() - // NOTE: only commit entries if the leader has commited entries in it's own term + // only commit entries if the leader has commited entries in it's own term if args.LeaderCommitTerm == args.Term { rf.sendCommittedEntries(args.LeaderCommitID) // 5. If leaderCommit > commitIndex, set commitIndex = @@ -614,6 +622,7 @@ func (rf *Raft) isLeader() bool { defer rf.mu.Unlock() return rf.role == Leader } + func (rf *Raft) buildAppendEntriesArgs(i int) ( *RequestAppendEntriesArgs, ) { @@ -637,7 +646,7 @@ func (rf *Raft) buildAppendEntriesArgs(i int) ( return &args } -func (rf *Raft) sendHeartbeat() { +func (rf *Raft) sendHeartbeat() { rf.mu.Lock() defer rf.mu.Unlock() rf.lastAppliedTime = time.Now() @@ -668,7 +677,7 @@ func (rf *Raft) runLeader() { rf.initLeaderState() for !rf.killed() && rf.isLeader() { rf.sendHeartbeat() - time.Sleep(time.Millisecond * time.Duration(35)) + time.Sleep(time.Millisecond * time.Duration(25)) } } @@ -736,12 +745,13 @@ func (rf *Raft) electionOutcome(yesVotes int) { rf.mu.Lock() defer rf.mu.Unlock() if yesVotes > len(rf.peers) / 2 { - DPrintf("[becomeCandidate.%d.%d] Taking leadership\n", rf.currentTerm, rf.me) + DPrintf("[becomeCandidate.%d.%d] Taking leadership term=%d\n", rf.currentTerm, rf.me, rf.currentTerm) rf.leader = &rf.me rf.role = Leader return } rf.role = Follower + DPrintf("[becomeCandidate.%d.%d] Lost election. Stepping down to follower term=%d\n", rf.currentTerm, rf.me, rf.currentTerm) } func (rf *Raft) becomeCandidate() { @@ -760,7 +770,7 @@ func (rf *Raft) missingHeartbeat(ms time.Duration) bool { func getRandomTickerDuration() time.Duration { min := 200 - max := 400 + max := 500 return time.Millisecond * time.Duration(rand.Intn(max - min) + min) } diff --git a/src/raft/tester.sh b/src/raft/tester.sh index 82ea372..8bcbf0b 100755 --- a/src/raft/tester.sh +++ b/src/raft/tester.sh @@ -1,10 +1,9 @@ #!/bin/bash -count = 100 -for i in {1..100} +for i in {1..300} do go test -run Figure8Unreliable2C # go test -run Figure82C # go test -run Backup2B # go test -run UnreliableChurn2C - echo Completed test $i/100 + echo Completed test $i/300 done diff --git a/src/raft/util.go b/src/raft/util.go index c0130e8..5b9e695 100644 --- a/src/raft/util.go +++ b/src/raft/util.go @@ -62,3 +62,10 @@ func min(a, b int) int { } return b } + +func (rf *Raft) printLastLog(fnName string) { + if len(rf.log) > 0 { + entry := rf.log[len(rf.log) - 1] + DPrintf("[%s.%d.%d] Last Command In Log [%d|%d]=%d", fnName, rf.currentTerm, rf.me, entry.CommandIndex, entry.Term, entry.Command) + } +}