-
Notifications
You must be signed in to change notification settings - Fork 2k
/
Copy pathheartbeatstop.go
158 lines (134 loc) · 3.79 KB
/
heartbeatstop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package client
import (
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
)
type heartbeatStop struct {
lastOk time.Time
startupGrace time.Time
allocInterval map[string]time.Duration
allocHookCh chan *structs.Allocation
getRunner func(string) (AllocRunner, error)
logger hclog.InterceptLogger
shutdownCh chan struct{}
lock *sync.RWMutex
}
func newHeartbeatStop(
getRunner func(string) (AllocRunner, error),
timeout time.Duration,
logger hclog.InterceptLogger,
shutdownCh chan struct{}) *heartbeatStop {
h := &heartbeatStop{
startupGrace: time.Now().Add(timeout),
allocInterval: make(map[string]time.Duration),
allocHookCh: make(chan *structs.Allocation),
getRunner: getRunner,
logger: logger,
shutdownCh: shutdownCh,
lock: &sync.RWMutex{},
}
return h
}
// allocHook is called after (re)storing a new AllocRunner in the client. It registers the
// allocation to be stopped if the taskgroup is configured appropriately
func (h *heartbeatStop) allocHook(alloc *structs.Allocation) {
tg := allocTaskGroup(alloc)
if tg.StopAfterClientDisconnect != nil {
h.allocHookCh <- alloc
}
}
// shouldStop is called on a restored alloc to determine if lastOk is sufficiently in the
// past that it should be prevented from restarting
func (h *heartbeatStop) shouldStop(alloc *structs.Allocation) bool {
tg := allocTaskGroup(alloc)
if tg.StopAfterClientDisconnect != nil {
return h.shouldStopAfter(time.Now(), *tg.StopAfterClientDisconnect)
}
return false
}
func (h *heartbeatStop) shouldStopAfter(now time.Time, interval time.Duration) bool {
lastOk := h.getLastOk()
if lastOk.IsZero() {
return now.After(h.startupGrace)
}
return now.After(lastOk.Add(interval))
}
// watch is a loop that checks for allocations that should be stopped. It also manages the
// registration of allocs to be stopped in a single thread.
func (h *heartbeatStop) watch() {
// If we never manage to successfully contact the server, we want to stop our allocs
// after duration + start time
h.lastOk = time.Now()
stop := make(chan string, 1)
var now time.Time
var interval time.Duration
checkAllocs := false
for {
// minimize the interval
interval = 5 * time.Second
for _, t := range h.allocInterval {
if t < interval {
interval = t
}
}
checkAllocs = false
timeout := time.After(interval)
select {
case allocID := <-stop:
if err := h.stopAlloc(allocID); err != nil {
h.logger.Warn("error stopping on heartbeat timeout", "alloc", allocID, "error", err)
continue
}
delete(h.allocInterval, allocID)
case alloc := <-h.allocHookCh:
tg := allocTaskGroup(alloc)
if tg.StopAfterClientDisconnect != nil {
h.allocInterval[alloc.ID] = *tg.StopAfterClientDisconnect
}
case <-timeout:
checkAllocs = true
case <-h.shutdownCh:
return
}
if !checkAllocs {
continue
}
now = time.Now()
for allocID, d := range h.allocInterval {
if h.shouldStopAfter(now, d) {
stop <- allocID
}
}
}
}
// setLastOk sets the last known good heartbeat time to the current time, and persists that time to disk
func (h *heartbeatStop) setLastOk(t time.Time) {
h.lock.Lock()
defer h.lock.Unlock()
h.lastOk = t
}
func (h *heartbeatStop) getLastOk() time.Time {
h.lock.RLock()
defer h.lock.RUnlock()
return h.lastOk
}
// stopAlloc actually stops the allocation
func (h *heartbeatStop) stopAlloc(allocID string) error {
runner, err := h.getRunner(allocID)
if err != nil {
return err
}
h.logger.Debug("stopping alloc for stop_after_client_disconnect", "alloc", allocID)
runner.Destroy()
return nil
}
func allocTaskGroup(alloc *structs.Allocation) *structs.TaskGroup {
for _, tg := range alloc.Job.TaskGroups {
if tg.Name == alloc.TaskGroup {
return tg
}
}
return nil
}