-
Notifications
You must be signed in to change notification settings - Fork 313
/
completion_barriers_test.go
120 lines (113 loc) · 2.27 KB
/
completion_barriers_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package main
import (
"math"
"testing"
"time"
)
func TestCouintingCompletionBarrierWait(t *testing.T) {
parties := uint64(10)
b := newCountingCompletionBarrier(1000)
for i := uint64(0); i < parties; i++ {
go func() {
for b.tryGrabWork() {
b.jobDone()
}
}()
}
wc := make(chan struct{})
go func() {
<-b.done()
wc <- struct{}{}
}()
select {
case <-wc:
return
case <-time.After(100 * time.Millisecond):
t.Fail()
}
}
func TestTimedCompletionBarrierWait(t *testing.T) {
parties := uint64(10)
duration := 100 * time.Millisecond
timeout := duration * 2
err := 15 * time.Millisecond
sleepDuration := 2 * time.Millisecond
b := newTimedCompletionBarrier(duration)
for i := uint64(0); i < parties; i++ {
go func() {
for b.tryGrabWork() {
time.Sleep(sleepDuration)
b.jobDone()
}
}()
}
wc := make(chan time.Duration)
go func() {
start := time.Now()
<-b.done()
wc <- time.Since(start)
}()
select {
case actual := <-wc:
if !approximatelyEqual(duration, actual, sleepDuration+err) {
t.Errorf("Expected to run %v, but ran %v instead", duration, actual)
}
case <-time.After(timeout):
t.Error("Barrier hanged")
}
}
func TestTimeBarrierCancel(t *testing.T) {
b := newTimedCompletionBarrier(9000 * time.Second)
sleepTime := 100 * time.Millisecond
go func() {
time.Sleep(sleepTime)
b.cancel()
}()
select {
case <-b.done():
if c := b.completed(); c != 1.0 {
t.Error(c)
}
case <-time.After(sleepTime * 2):
t.Fail()
}
}
func TestCountedBarrierCancel(t *testing.T) {
parties := uint64(10)
b := newCountingCompletionBarrier(math.MaxUint64)
sleepTime := 100 * time.Millisecond
for i := uint64(0); i < parties; i++ {
go func() {
for b.tryGrabWork() {
b.jobDone()
}
}()
}
go func() {
time.Sleep(sleepTime)
b.cancel()
}()
select {
case <-b.done():
if c := b.completed(); c != 1.0 {
t.Error(c)
}
case <-time.After(5 * time.Second):
t.Fail()
}
}
func TestTimeBarrierPanicOnBadDuration(t *testing.T) {
defer func() {
r := recover()
if r == nil {
t.Error("shouldn't be empty")
t.Fail()
}
}()
newTimedCompletionBarrier(-1 * time.Second)
t.Error("unreachable")
t.Fail()
}
func approximatelyEqual(expected, actual, err time.Duration) bool {
return expected-err < actual && actual < expected+err
}