Skip to content

Commit

Permalink
Figure8Unreliable2C 300/300 passing
Browse files Browse the repository at this point in the history
  • Loading branch information
codicocodes committed Jul 17, 2022
1 parent 26e1025 commit 3be8772
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 71 deletions.
2 changes: 2 additions & 0 deletions src/raft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (cfg *config) applier(i int, applyCh chan ApplyMsg) {
err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex)
}
if err_msg != "" {
DPrintf("apply error: %v", err_msg)
DPrintClose()
log.Fatalf("apply error: %v", err_msg)
cfg.applyErr[i] = err_msg
Expand Down Expand Up @@ -264,6 +265,7 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
// Ignore other types of ApplyMsg.
}
if err_msg != "" {
DPrintf("apply error: %v", err_msg)
DPrintClose()
log.Fatalf("apply error: %v", err_msg)
cfg.applyErr[i] = err_msg
Expand Down
146 changes: 78 additions & 68 deletions src/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,6 @@ type Raft struct {
matchCommandIds []int // for each server, index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)
}

func (rf *Raft) printLastLog(fnName string) {
if len(rf.log) > 0 {
entry := rf.log[len(rf.log) - 1]
DPrintf("[%s.%d.%d] Last Command In Log [%d|%d]=%d", fnName, rf.currentTerm, rf.me, entry.CommandIndex, entry.Term, entry.Command)
}
}


func (rf *Raft) getLastLogEntry() (*ApplyMsg) {
if len(rf.log) == 0 {
return nil
Expand All @@ -66,11 +58,8 @@ func (rf *Raft) getLastLogEntry() (*ApplyMsg) {
func (rf *Raft) GetState() (int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
var term int
var isleader bool
term = rf.currentTerm
isleader = rf.leader == &rf.me
return term, isleader
isLeader := rf.leader == &rf.me
return rf.currentTerm, isLeader
}

type Role int
Expand Down Expand Up @@ -111,16 +100,16 @@ func (rf *Raft) readPersist(data []byte) {
var votedFor *int
var currentTerm int
if e := d.Decode(&log); e != nil {
println(e)
DPrintf(e.Error())
panic("Fail decode log")
}
if e := d.Decode(&votedFor); e != nil {
println(e)
DPrintf(e.Error())
panic("Fail decode votedFor")
}
if e := d.Decode(&currentTerm); e != nil {
println("Fail decode currentTerm", e.Error())
println(rf.currentTerm)
DPrintf("Fail decode currentTerm %s", e.Error())
DPrintf("%d", rf.currentTerm)
currentTerm = 0
}
rf.log = log
Expand Down Expand Up @@ -174,37 +163,56 @@ func (rf *Raft) grantVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.role = Follower
rf.votedFor = &args.CandidateId
reply.VoteGranted = true
rf.persist()
rf.persist()
}

func (rf *Raft) checkAlreadyVotedInTerm(args *RequestVoteArgs) bool {
return args.Term == rf.currentTerm && rf.votedFor != nil
}

func (rf *Raft) checkIsLowTerm(args *RequestVoteArgs) bool {
return args.Term < rf.currentTerm
}

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

// Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {
if rf.checkIsLowTerm(args) {
DPrintf("Voting no due to low candiate term %d, my term %d", args.Term, rf.currentTerm)
reply.VoteNo(rf.currentTerm)
return
}

if args.Term == rf.currentTerm {
if rf.votedFor != nil {
// NOTE: vote no if we did not vote in this round
// handle the current term
if rf.checkAlreadyVotedInTerm(args) {
// vote no if we already voted in this term
DPrintf("Voting no because me=%d already voted in term %d, my term %d", rf.me, args.Term, rf.currentTerm)
reply.VoteNo(rf.currentTerm)
return
}
} else {
rf.currentTerm = args.Term
rf.votedFor = nil
rf.persist()
}

// the term has increased, or we did not vote in this term
// if I did not vote in this term, I should not be the leader
// so either way, we should step down from leadership here
// make sure we step down from leadership if we see a higher term in the voting process
// we used to not do this, and it's possible that this was the cause of a bug
// TODO: use rf.stepDown
if rf.role == Leader {
fmt.Printf("[RequestVote.%d.%d]: I used to not step down here. This caused all the issues? (When I also voted no)\n", rf.currentTerm, rf.me)
}
rf.currentTerm = args.Term
rf.votedFor = nil
rf.role = Follower
rf.persist()

logLength := len(rf.log)

// if there is nothing in the log the voters log can't be more up to date
// if there is nothing in the log my log can't be more up to date
// lets just say yes
if logLength == 0 {
DPrintf("Voting yes because nothing my log so candidat egotta be up to date")
DPrintf("Voting yes because nothing my log so candidate gotta be up to date")
rf.grantVote(args, reply)
return
}
Expand Down Expand Up @@ -273,22 +281,12 @@ type RequestAppendEntriesReply struct {
// commit commandID is not persisted I think, so we might override commited entries by doing this?
func (rf *Raft) calculateConflictInfo(args *RequestAppendEntriesArgs, reply *RequestAppendEntriesReply) {
rf.printLastLog("calculateConflictInfo")
// TODO: Conflicting index should be the first index of that specific problematic term

if len(rf.log) == 0 {
reply.LastLogIndex = -1
return
}

lastLogIndex := len(rf.log) - 1

// we probably don't need this part
// lastLog := rf.log[lastLogIndex]
// if args.PrevLogIndex > lastLogIndex && lastLog.Term != args.PrevLogTerm {
// reply.LastLogIndex = lastLogIndex
// return
// }

conflictIdx := min(args.PrevLogIndex, lastLogIndex)
conflictTerm := rf.log[conflictIdx].Term

Expand Down Expand Up @@ -354,16 +352,16 @@ func (rf *Raft) decrementNextIndex(server int, args *RequestAppendEntriesArgs, r
decrementedIdx = 0
}
if rf.nextIndex[server] <= decrementedIdx {
fmt.Printf("nextIndex=%d decrementedNextIdx=%d\n", rf.nextIndex[server] , decrementedIdx)
DPrintf("nextIndex=%d decrementedNextIdx=%d\n", rf.nextIndex[server] , decrementedIdx)
// if this situation happens, this node CANNOT accept my messages?
// should i consider stepping down here
if rf.commitCommandTerm != rf.currentTerm {
// I never ever commited any commands, so its probably just time to step down
fmt.Printf("The server REFUSES to replicate, but I never committed anything so im stepping down. nextIndex=%d decrementedNextIdx=%d\n", rf.nextIndex[server] , decrementedIdx)
DPrintf("The server REFUSES to replicate, but I never committed anything so im stepping down. nextIndex=%d decrementedNextIdx=%d\n", rf.nextIndex[server] , decrementedIdx)
rf.role = Follower
return
} else {
fmt.Printf("The server REFUSES to replicate, but I have committed... monkaHmmm nextIndex=%d decrementedNextIdx=%d\n", rf.nextIndex[server] , decrementedIdx)
DPrintf("The server REFUSES to replicate, but I have committed... monkaHmmm nextIndex=%d decrementedNextIdx=%d\n", rf.nextIndex[server] , decrementedIdx)
}
}
rf.nextIndex[server] = decrementedIdx
Expand Down Expand Up @@ -412,6 +410,7 @@ func (rf *Raft) getServerByID(server int)*labrpc.ClientEnd {
func (rf *Raft) stepDown(reply *RequestAppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
DPrintf("[stepDown.%d.%d] Stepping down because I've seen a higher term than my current term.", rf.currentTerm, rf.me)
rf.currentTerm = reply.Term
rf.votedFor = nil
rf.role = Follower
Expand All @@ -429,19 +428,20 @@ func (rf *Raft) sendAppendEntries(


if !reply.Success {
if reply.Term > rf.currentTerm {
if term, _ := rf.GetState(); reply.Term > term {
// step down as a leader if the replying server is in a higher term
rf.stepDown(reply)
} else {
rf.decrementNextIndex(server, args, reply)
return
}
rf.decrementNextIndex(server, args, reply)
return
}

if rf.role != Leader {
if args.hasEntries() {
fmt.Println("RECEIVED A REPLICATED LOG WITHOUT BEING LEADER ANYMORE. Should i break?")
fmt.Println(rf.commitCommandTerm, rf.currentTerm)
}
// If we are not the leader any longer, we should not continue replicating any logs
if !rf.isLeader() {
rf.mu.Lock()
defer rf.mu.Unlock()
DPrintf("[sendAppendEntries.%d.%d] Received a replicated log without being leader any longer.", rf.currentTerm, rf.me)
return
}

Expand All @@ -451,9 +451,9 @@ func (rf *Raft) sendAppendEntries(
recentCommandID := recentEntry.CommandIndex
rf.updateLastAppended(server, recentCommandID)

// NOTE: update the commit idx
// update the commit idx
if rf.checkCommitted(recentCommandID, recentCommandTerm) {
// NOTE: send freshly commited entries to the applyCh
// send freshly commited entries to the service throught the applyCh
rf.sendEntries(recentCommandID)
rf.storeCommitIdx(recentCommandID, recentCommandTerm)
}
Expand Down Expand Up @@ -512,6 +512,7 @@ func (rf *Raft) sendCommittedEntries(commitID int) {
break
}
msg := rf.log[index]
DPrintf("[sendCommittedEntries.%d.%d] Sending Log Entry [%d|%d]=%d", rf.currentTerm, rf.me, msg.CommandIndex, msg.Term, msg.Command)
rf.applyCh <- msg
rf.lastAppliedIndex = msg.CommandIndex - 1
}
Expand All @@ -521,12 +522,16 @@ func (rf *Raft) appendNewEntries(args *RequestAppendEntriesArgs) {
// 3. If an existing entry conflicts with a new one (same index
// but different terms), delete the existing entry and all that
// follow it (§5.3)
// we have determined that the PrevLogIndex is correct here so lets split
splitIdx := args.PrevLogIndex + 1
rf.log = rf.log[:splitIdx]
rf.printLastLog("appendNewEntries")
rf.printLastLog("appendNewEntries(preMerge)")
DPrintf("[appendNewEntries.%d.%d] Current CommitID %d \n", rf.currentTerm, rf.me, rf.commitCommandID)
entry := args.Entries[0]
DPrintf("[appendNewEntries.%d.%d] First Appended Log [%d|%d]=%d", rf.currentTerm, rf.me, entry.CommandIndex, entry.Term, entry.Command)
// 4. Append any new entries not already in the log
rf.log = append(rf.log, args.Entries...)
rf.printLastLog("appendNewEntries(postMerge)")
}


Expand All @@ -538,26 +543,33 @@ func (rf *Raft) AppendEntries(
defer rf.mu.Unlock()
// 1. Reply false if term < currentTerm (§5.1)
if rf.currentTerm > args.Term {
DPrintf("%d Not accepting log due to low term %d myTerm=%d", rf.me, args.Term, rf.currentTerm)
DPrintf("me=%d Not accepting log due to low args.Term=%d myTerm=%d", rf.me, args.Term, rf.currentTerm)
rf.denyAppendEntry(args, reply)
return
}

if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = nil
rf.role = Follower
rf.persist()
}

// 2. Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
if args.PrevLogIndex >= 0 {
lastLogIndex := len(rf.log) - 1
if lastLogIndex < args.PrevLogIndex {
DPrintf("%d Not accepting log due to too high prev log index. loglen %d, prev index %d, leader commit ID %d", rf.me, len(rf.log), args.PrevLogIndex, args.LeaderCommitID)
DPrintf("%d My commit ID: %d", rf.me, rf.commitCommandID)
DPrintf("me=%d Not accepting log due to too high prev log index. loglen %d, prev index %d, leader commit ID %d", rf.me, len(rf.log), args.PrevLogIndex, args.LeaderCommitID)
DPrintf("me=%d My commit ID: %d", rf.me, rf.commitCommandID)
rf.calculateConflictInfo(args, reply)
rf.denyAppendEntry(args, reply)
return
}

if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
DPrintf("%d Not accepting log due to incorrect PrevLogTerm my term %d, prevLogTerm %d", rf.me, rf.log[args.PrevLogIndex].Term, args.PrevLogTerm)
DPrintf("%d My commit ID: %d", rf.me, rf.commitCommandID)
DPrintf("me=%d Not accepting log due to incorrect PrevLogTerm my term %d, prevLogTerm %d", rf.me, rf.log[args.PrevLogIndex].Term, args.PrevLogTerm)
DPrintf("me=%d My commit ID: %d", rf.me, rf.commitCommandID)
rf.calculateConflictInfo(args, reply)
rf.denyAppendEntry(args, reply)
return
Expand All @@ -580,22 +592,18 @@ func (rf *Raft) AppendEntries(
reply.Success = true

if args.hasEntries() {
DPrintf("[appendNewEntries.%d.%d] ", rf.currentTerm, rf.me)
DPrintf("[appendNewEntries.%d.%d]", rf.currentTerm, rf.me)
rf.appendNewEntries(args)
}

// Persist after appending entry but before sending to service
// if raft crashes after sending to the service but not persisting the new log, we will have an undefined state
rf.role = Follower
rf.leader = &args.Leader
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = nil
}
rf.lastAppliedTime = time.Now()
rf.persist()

// NOTE: only commit entries if the leader has commited entries in it's own term
// only commit entries if the leader has commited entries in it's own term
if args.LeaderCommitTerm == args.Term {
rf.sendCommittedEntries(args.LeaderCommitID)
// 5. If leaderCommit > commitIndex, set commitIndex =
Expand All @@ -614,6 +622,7 @@ func (rf *Raft) isLeader() bool {
defer rf.mu.Unlock()
return rf.role == Leader
}

func (rf *Raft) buildAppendEntriesArgs(i int) (
*RequestAppendEntriesArgs,
) {
Expand All @@ -637,7 +646,7 @@ func (rf *Raft) buildAppendEntriesArgs(i int) (
return &args
}

func (rf *Raft) sendHeartbeat() {
func (rf *Raft) sendHeartbeat() {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.lastAppliedTime = time.Now()
Expand Down Expand Up @@ -668,7 +677,7 @@ func (rf *Raft) runLeader() {
rf.initLeaderState()
for !rf.killed() && rf.isLeader() {
rf.sendHeartbeat()
time.Sleep(time.Millisecond * time.Duration(35))
time.Sleep(time.Millisecond * time.Duration(25))
}
}

Expand Down Expand Up @@ -736,12 +745,13 @@ func (rf *Raft) electionOutcome(yesVotes int) {
rf.mu.Lock()
defer rf.mu.Unlock()
if yesVotes > len(rf.peers) / 2 {
DPrintf("[becomeCandidate.%d.%d] Taking leadership\n", rf.currentTerm, rf.me)
DPrintf("[becomeCandidate.%d.%d] Taking leadership term=%d\n", rf.currentTerm, rf.me, rf.currentTerm)
rf.leader = &rf.me
rf.role = Leader
return
}
rf.role = Follower
DPrintf("[becomeCandidate.%d.%d] Lost election. Stepping down to follower term=%d\n", rf.currentTerm, rf.me, rf.currentTerm)
}

func (rf *Raft) becomeCandidate() {
Expand All @@ -760,7 +770,7 @@ func (rf *Raft) missingHeartbeat(ms time.Duration) bool {

func getRandomTickerDuration() time.Duration {
min := 200
max := 400
max := 500
return time.Millisecond * time.Duration(rand.Intn(max - min) + min)
}

Expand Down
5 changes: 2 additions & 3 deletions src/raft/tester.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#!/bin/bash
count = 100
for i in {1..100}
for i in {1..300}
do
go test -run Figure8Unreliable2C
# go test -run Figure82C
# go test -run Backup2B
# go test -run UnreliableChurn2C
echo Completed test $i/100
echo Completed test $i/300
done
7 changes: 7 additions & 0 deletions src/raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,10 @@ func min(a, b int) int {
}
return b
}

func (rf *Raft) printLastLog(fnName string) {
if len(rf.log) > 0 {
entry := rf.log[len(rf.log) - 1]
DPrintf("[%s.%d.%d] Last Command In Log [%d|%d]=%d", fnName, rf.currentTerm, rf.me, entry.CommandIndex, entry.Term, entry.Command)
}
}

0 comments on commit 3be8772

Please sign in to comment.