Skip to content

Commit

Permalink
Merge pull request etcd-io#6807 from xiang90/fix_raft_test
Browse files Browse the repository at this point in the history
rafttest: make raft test reliable
  • Loading branch information
xiang90 authored Nov 4, 2016
2 parents 91360e1 + e5987de commit 476ff67
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 36 deletions.
14 changes: 13 additions & 1 deletion raft/rafttest/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,20 @@ func (rn *raftNetwork) send(m raftpb.Message) {
time.Sleep(time.Duration(rd))
}

// use marshal/unmarshal to copy message to avoid data race.
b, err := m.Marshal()
if err != nil {
panic(err)
}

var cm raftpb.Message
err = cm.Unmarshal(b)
if err != nil {
panic(err)
}

select {
case to <- m:
case to <- cm:
default:
// drop messages when the receiver queue is full.
}
Expand Down
9 changes: 7 additions & 2 deletions raft/rafttest/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package rafttest

import (
"log"
"sync"
"time"

"github.com/coreos/etcd/raft"
Expand All @@ -32,7 +33,9 @@ type node struct {

// stable
storage *raft.MemoryStorage
state raftpb.HardState

mu sync.Mutex // guards state
state raftpb.HardState
}

func startNode(id uint64, peers []raft.Peer, iface iface) *node {
Expand Down Expand Up @@ -68,7 +71,9 @@ func (n *node) start() {
n.Tick()
case rd := <-n.Ready():
if !raft.IsEmptyHardState(rd.HardState) {
n.mu.Lock()
n.state = rd.HardState
n.mu.Unlock()
n.storage.SetHardState(n.state)
}
n.storage.Append(rd.Entries)
Expand All @@ -79,7 +84,7 @@ func (n *node) start() {
}
n.Advance()
case m := <-n.iface.recv():
n.Step(context.TODO(), m)
go n.Step(context.TODO(), m)
case <-n.stopc:
n.Stop()
log.Printf("raft.%d: stop", n.id)
Expand Down
114 changes: 81 additions & 33 deletions raft/rafttest/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ func TestBasicProgress(t *testing.T) {
nodes = append(nodes, n)
}

time.Sleep(10 * time.Millisecond)
waitLeader(nodes)

for i := 0; i < 10000; i++ {
for i := 0; i < 100; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}

time.Sleep(500 * time.Millisecond)
if !waitCommitConverge(nodes, 100) {
t.Errorf("commits failed to converge!")
}

for _, n := range nodes {
n.stop()
if n.state.Commit != 10006 {
t.Errorf("commit = %d, want = 10006", n.state.Commit)
}
}
}

Expand All @@ -59,31 +59,32 @@ func TestRestart(t *testing.T) {
nodes = append(nodes, n)
}

time.Sleep(50 * time.Millisecond)
for i := 0; i < 300; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
l := waitLeader(nodes)
k1, k2 := (l+1)%5, (l+2)%5

for i := 0; i < 30; i++ {
nodes[l].Propose(context.TODO(), []byte("somedata"))
}
nodes[1].stop()
for i := 0; i < 300; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
nodes[k1].stop()
for i := 0; i < 30; i++ {
nodes[(l+3)%5].Propose(context.TODO(), []byte("somedata"))
}
nodes[2].stop()
for i := 0; i < 300; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
nodes[k2].stop()
for i := 0; i < 30; i++ {
nodes[(l+4)%5].Propose(context.TODO(), []byte("somedata"))
}
nodes[2].restart()
for i := 0; i < 300; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
nodes[k2].restart()
for i := 0; i < 30; i++ {
nodes[l].Propose(context.TODO(), []byte("somedata"))
}
nodes[k1].restart()

if !waitCommitConverge(nodes, 120) {
t.Errorf("commits failed to converge!")
}
nodes[1].restart()

// give some time for nodes to catch up with the raft leader
time.Sleep(500 * time.Millisecond)
for _, n := range nodes {
n.stop()
if n.state.Commit != 1206 {
t.Errorf("commit = %d, want = 1206", n.state.Commit)
}
}
}

Expand All @@ -98,30 +99,77 @@ func TestPause(t *testing.T) {
nodes = append(nodes, n)
}

time.Sleep(50 * time.Millisecond)
for i := 0; i < 300; i++ {
waitLeader(nodes)

for i := 0; i < 30; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
nodes[1].pause()
for i := 0; i < 300; i++ {
for i := 0; i < 30; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
nodes[2].pause()
for i := 0; i < 300; i++ {
for i := 0; i < 30; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
nodes[2].resume()
for i := 0; i < 300; i++ {
for i := 0; i < 30; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
nodes[1].resume()

// give some time for nodes to catch up with the raft leader
time.Sleep(300 * time.Millisecond)
if !waitCommitConverge(nodes, 120) {
t.Errorf("commits failed to converge!")
}

for _, n := range nodes {
n.stop()
if n.state.Commit != 1206 {
t.Errorf("commit = %d, want = 1206", n.state.Commit)
}
}

func waitLeader(ns []*node) int {
var l map[uint64]struct{}
var lindex int

for {
l = make(map[uint64]struct{})

for i, n := range ns {
lead := n.Status().SoftState.Lead
if lead != 0 {
l[lead] = struct{}{}
if n.id == lead {
lindex = i
}
}
}

if len(l) == 1 {
return lindex
}
}
}

func waitCommitConverge(ns []*node, target uint64) bool {
var c map[uint64]struct{}

for i := 0; i < 50; i++ {
c = make(map[uint64]struct{})
var good int

for _, n := range ns {
commit := n.Node.Status().HardState.Commit
c[commit] = struct{}{}
if commit > target {
good++
}
}

if len(c) == 1 && good == len(ns) {
return true
}
time.Sleep(100 * time.Millisecond)
}

return false
}

0 comments on commit 476ff67

Please sign in to comment.