Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 208d114

Browse files
author
David Chung
authored
Use Finite State Machines to manage instance/provisioning lifecycle (#427)
Signed-off-by: David Chung <david.chung@docker.com>
1 parent 7f33cd9 commit 208d114

23 files changed

+4183
-6
lines changed

pkg/broker/client/sse.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func Subscribe(url, topic string, opt Options) (<-chan *types.Any, <-chan error,
158158
} else {
159159

160160
select {
161-
case errCh <- fmt.Errorf("no data: %v", line):
161+
case errCh <- fmt.Errorf("no data: %s", string(line)):
162162
default:
163163
}
164164

pkg/broker/client/sse_test.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,13 @@ func TestBrokerMultiSubscriberPartialMatchTopic(t *testing.T) {
230230

231231
opts := Options{SocketDir: filepath.Dir(socketFile)}
232232

233-
topic1, errs1, err := Subscribe(socket, "local/instance", opts)
234-
require.NoError(t, err)
233+
start := make(chan struct{})
235234
go func() {
235+
<-start
236+
237+
topic1, errs1, err := Subscribe(socket, "local/instance", opts)
238+
require.NoError(t, err)
239+
236240
for {
237241
select {
238242
case e := <-errs1:
@@ -249,9 +253,12 @@ func TestBrokerMultiSubscriberPartialMatchTopic(t *testing.T) {
249253
}
250254
}()
251255

252-
topic2, errs2, err := Subscribe(socket, "local/instancetest", opts)
253-
require.NoError(t, err)
254256
go func() {
257+
<-start
258+
259+
topic2, errs2, err := Subscribe(socket, "local/instancetest", opts)
260+
require.NoError(t, err)
261+
255262
for {
256263
select {
257264
case e := <-errs2:
@@ -269,8 +276,9 @@ func TestBrokerMultiSubscriberPartialMatchTopic(t *testing.T) {
269276
}()
270277

271278
go func() {
279+
<-start
280+
272281
for {
273-
<-time.After(10 * time.Millisecond)
274282
now := time.Now()
275283
evt := event{Time: now.UnixNano(), Message: fmt.Sprintf("Now is %v", now)}
276284
require.NoError(t, broker.Publish("local/instance", evt))
@@ -279,6 +287,8 @@ func TestBrokerMultiSubscriberPartialMatchTopic(t *testing.T) {
279287
}
280288
}()
281289

290+
close(start)
291+
282292
// Test a few rounds to make sure all subscribers get the same messages each round.
283293
for i := 0; i < 5; i++ {
284294
b := <-received2

pkg/fsm/clock.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package fsm
2+
3+
import (
4+
"time"
5+
)
6+
7+
// Clock adapts a timer tick
8+
type Clock struct {
9+
C <-chan Tick
10+
c chan<- Tick
11+
stop chan struct{}
12+
start chan struct{}
13+
driver func()
14+
running bool
15+
}
16+
17+
// NewClock returns a clock
18+
func NewClock() *Clock {
19+
c := make(chan Tick)
20+
stop := make(chan struct{})
21+
clock := &Clock{
22+
C: c,
23+
c: c,
24+
stop: stop,
25+
start: make(chan struct{}),
26+
}
27+
clock.driver = func() {
28+
<-clock.start
29+
clock.start = nil
30+
31+
for {
32+
select {
33+
case <-clock.stop:
34+
close(clock.c)
35+
return
36+
}
37+
}
38+
}
39+
return clock.run()
40+
}
41+
42+
// Start starts the clock
43+
func (t *Clock) Start() {
44+
if t.start != nil {
45+
close(t.start)
46+
}
47+
}
48+
49+
// Tick makes one tick of the clock
50+
func (t *Clock) Tick() {
51+
t.c <- Tick(1)
52+
}
53+
54+
func (t *Clock) run() *Clock {
55+
if t.driver != nil {
56+
go t.driver()
57+
}
58+
t.running = true
59+
return t
60+
}
61+
62+
// Stop stops the ticks
63+
func (t *Clock) Stop() {
64+
if t.running {
65+
close(t.stop)
66+
t.running = false
67+
}
68+
}
69+
70+
// Wall adapts a regular time.Tick to return a clock
71+
func Wall(tick <-chan time.Time) *Clock {
72+
out := make(chan Tick)
73+
stop := make(chan struct{})
74+
clock := &Clock{
75+
C: out,
76+
c: out,
77+
stop: stop,
78+
start: make(chan struct{}),
79+
}
80+
81+
clock.driver = func() {
82+
<-clock.start
83+
clock.start = nil
84+
85+
for {
86+
select {
87+
case <-clock.stop:
88+
close(clock.c)
89+
return
90+
case <-tick:
91+
// note that golang's time ticker won't close the channel when stopped.
92+
// so we will do the closing ourselves to avoid leaking the goroutine
93+
clock.c <- Tick(1)
94+
}
95+
}
96+
}
97+
98+
return clock.run()
99+
}

pkg/fsm/clock_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package fsm
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
log "github.com/golang/glog"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestClock(t *testing.T) {
12+
clock := NewClock()
13+
14+
done := make(chan struct{})
15+
start := make(chan struct{})
16+
go func() {
17+
defer close(done)
18+
<-start
19+
20+
for {
21+
_, open := <-clock.C
22+
if !open {
23+
return // we expect this to be run
24+
}
25+
}
26+
}()
27+
28+
log.Infoln("Start")
29+
close(start)
30+
clock.Start()
31+
clock.Tick()
32+
clock.Tick()
33+
clock.Tick()
34+
35+
log.Infoln("Stopping")
36+
clock.Stop()
37+
38+
log.Infoln("waiting for done")
39+
<-done
40+
log.Infoln("done")
41+
}
42+
43+
func TestWallClock(t *testing.T) {
44+
45+
ticker := time.After(100 * time.Millisecond)
46+
clock := Wall(ticker)
47+
48+
start := make(chan struct{})
49+
go func() {
50+
<-start
51+
52+
<-clock.C
53+
54+
clock.Stop()
55+
}()
56+
57+
close(start) // from here receive just 1 tick
58+
clock.Start()
59+
<-clock.C
60+
}
61+
62+
func TestWallClock2(t *testing.T) {
63+
64+
ticker := time.Tick(100 * time.Millisecond)
65+
clock := Wall(ticker)
66+
67+
start := make(chan struct{})
68+
69+
ticks := make(chan int, 1000)
70+
go func() {
71+
72+
defer close(ticks)
73+
74+
<-start
75+
76+
for {
77+
_, open := <-clock.C
78+
if !open {
79+
return
80+
}
81+
log.Infoln("tick")
82+
ticks <- 1
83+
}
84+
}()
85+
86+
close(start)
87+
clock.Start()
88+
log.Infoln("starting")
89+
90+
time.Sleep(1 * time.Second)
91+
92+
log.Infoln("Stopping")
93+
clock.Stop()
94+
log.Infoln("Stopped")
95+
96+
total := 0
97+
for i := range ticks {
98+
total += i
99+
}
100+
log.Infoln("count=", total)
101+
require.Equal(t, 10, total)
102+
}

pkg/fsm/errors.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package fsm
2+
3+
import (
4+
"fmt"
5+
)
6+
7+
type errDuplicateState Index
8+
9+
func (e errDuplicateState) Error() string {
10+
return fmt.Sprintf("duplicated state index: %d", e)
11+
}
12+
13+
type unknownState Index
14+
15+
func (e unknownState) Error() string {
16+
return fmt.Sprintf("unknown state index: %d", e)
17+
}
18+
19+
type unknownTransition Signal
20+
21+
func (e unknownTransition) Error() string {
22+
return fmt.Sprintf("no transition defined for signal %d", e)
23+
}
24+
25+
type unknownSignal Signal
26+
27+
func (e unknownSignal) Error() string {
28+
return fmt.Sprintf("signal in action not found in transitions %d", e)
29+
}
30+
31+
type unknownInstance ID
32+
33+
func (e unknownInstance) Error() string {
34+
return fmt.Sprintf("unknown instance %d", e)
35+
}
36+
37+
type nilAction Signal
38+
39+
func (e nilAction) Error() string {
40+
return fmt.Sprintf("nil action corresponding to signal %d", e)
41+
}
42+
43+
type noTransitions Spec
44+
45+
func (e noTransitions) Error() string {
46+
return fmt.Sprintf("no transitions defined: count(states)=%d", len(e.states))
47+
}

pkg/fsm/fifo.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package fsm
2+
3+
func newFifo(size int) *fifo {
4+
return &fifo{
5+
nodes: make([]*event, size),
6+
size: size,
7+
}
8+
}
9+
10+
// fifo is a basic FIFO queue of events based on a circular list that resizes as needed.
11+
type fifo struct {
12+
nodes []*event
13+
size int
14+
head int
15+
tail int
16+
count int
17+
}
18+
19+
// Len returns the size of the fifo
20+
func (q *fifo) Len() int {
21+
return q.count
22+
}
23+
24+
// push adds a node to the queue.
25+
func (q *fifo) push(n *event) {
26+
if q.head == q.tail && q.count > 0 {
27+
nodes := make([]*event, len(q.nodes)+q.size)
28+
copy(nodes, q.nodes[q.head:])
29+
copy(nodes[len(q.nodes)-q.head:], q.nodes[:q.head])
30+
q.head = 0
31+
q.tail = len(q.nodes)
32+
q.nodes = nodes
33+
}
34+
q.nodes[q.tail] = n
35+
q.tail = (q.tail + 1) % len(q.nodes)
36+
q.count++
37+
}
38+
39+
// pop removes and returns a node from the queue in first to last order.
40+
func (q *fifo) pop() *event {
41+
if q.count == 0 {
42+
return nil
43+
}
44+
node := q.nodes[q.head]
45+
q.head = (q.head + 1) % len(q.nodes)
46+
q.count--
47+
return node
48+
}

pkg/fsm/fifo_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package fsm
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestFifo(t *testing.T) {
10+
11+
q := newFifo(10)
12+
13+
for i := 0; i < 100; i++ {
14+
q.push(&event{instance: ID(i)})
15+
}
16+
17+
require.Equal(t, 100, q.Len())
18+
19+
ids := []ID{}
20+
21+
for q.Len() > 0 {
22+
event := q.pop()
23+
require.NotNil(t, event)
24+
ids = append(ids, event.instance)
25+
}
26+
27+
require.Equal(t, 100, len(ids))
28+
for i := 0; i < 100; i++ {
29+
require.Equal(t, ID(i), ids[i])
30+
}
31+
}

0 commit comments

Comments
 (0)