forked from tailscale/tailscale
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprober.go
386 lines (338 loc) · 9.74 KB
/
prober.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// Package prober implements a simple blackbox prober. Each probe runs
// in its own goroutine, and run results are recorded as Prometheus
// metrics.
package prober
import (
"context"
"errors"
"fmt"
"hash/fnv"
"log"
"math/rand"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// ProbeFunc is a function that probes something and reports whether
// the probe succeeded. The provided context's deadline must be obeyed
// for correct probe scheduling.
type ProbeFunc func(context.Context) error
// a Prober manages a set of probes and keeps track of their results.
type Prober struct {
// Whether to spread probe execution over time by introducing a
// random delay before the first probe run.
spread bool
// Whether to run all probes once instead of running them in a loop.
once bool
// Time-related functions that get faked out during tests.
now func() time.Time
newTicker func(time.Duration) ticker
mu sync.Mutex // protects all following fields
probes map[string]*Probe
namespace string
metrics *prometheus.Registry
}
// New returns a new Prober.
func New() *Prober {
return newForTest(time.Now, newRealTicker)
}
func newForTest(now func() time.Time, newTicker func(time.Duration) ticker) *Prober {
p := &Prober{
now: now,
newTicker: newTicker,
probes: map[string]*Probe{},
metrics: prometheus.NewRegistry(),
namespace: "prober",
}
prometheus.DefaultRegisterer.MustRegister(p.metrics)
return p
}
// Run executes fun every interval, and exports probe results under probeName.
//
// Registering a probe under an already-registered name panics.
func (p *Prober) Run(name string, interval time.Duration, labels map[string]string, fun ProbeFunc) *Probe {
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.probes[name]; ok {
panic(fmt.Sprintf("probe named %q already registered", name))
}
l := prometheus.Labels{"name": name}
for k, v := range labels {
l[k] = v
}
ctx, cancel := context.WithCancel(context.Background())
probe := &Probe{
prober: p,
ctx: ctx,
cancel: cancel,
stopped: make(chan struct{}),
name: name,
doProbe: fun,
interval: interval,
initialDelay: initialDelay(name, interval),
metrics: prometheus.NewRegistry(),
mInterval: prometheus.NewDesc("interval_secs", "Probe interval in seconds", nil, l),
mStartTime: prometheus.NewDesc("start_secs", "Latest probe start time (seconds since epoch)", nil, l),
mEndTime: prometheus.NewDesc("end_secs", "Latest probe end time (seconds since epoch)", nil, l),
mLatency: prometheus.NewDesc("latency_millis", "Latest probe latency (ms)", nil, l),
mResult: prometheus.NewDesc("result", "Latest probe result (1 = success, 0 = failure)", nil, l),
}
prometheus.WrapRegistererWithPrefix(p.namespace+"_", p.metrics).MustRegister(probe.metrics)
probe.metrics.MustRegister(probe)
p.probes[name] = probe
go probe.loop()
return probe
}
func (p *Prober) unregister(probe *Probe) {
p.mu.Lock()
defer p.mu.Unlock()
probe.metrics.Unregister(probe)
p.metrics.Unregister(probe.metrics)
name := probe.name
delete(p.probes, name)
}
// WithSpread is used to enable random delay before the first run of
// each added probe.
func (p *Prober) WithSpread(s bool) *Prober {
p.spread = s
return p
}
// WithOnce mode can be used if you want to run all configured probes once
// rather than on a schedule.
func (p *Prober) WithOnce(s bool) *Prober {
p.once = s
return p
}
// WithMetricNamespace allows changing metric name prefix from the default `prober`.
func (p *Prober) WithMetricNamespace(n string) *Prober {
p.namespace = n
return p
}
// Wait blocks until all probes have finished execution. It should typically
// be used with the `once` mode to wait for probes to finish before collecting
// their results.
func (p *Prober) Wait() {
for {
chans := make([]chan struct{}, 0)
p.mu.Lock()
for _, p := range p.probes {
chans = append(chans, p.stopped)
}
p.mu.Unlock()
for _, c := range chans {
<-c
}
// Since probes can add other probes, retry if the number of probes has changed.
if p.activeProbes() != len(chans) {
continue
}
return
}
}
// Reports the number of registered probes.
func (p *Prober) activeProbes() int {
p.mu.Lock()
defer p.mu.Unlock()
return len(p.probes)
}
// Probe is a probe that healthchecks something and updates Prometheus
// metrics with the results.
type Probe struct {
prober *Prober
ctx context.Context
cancel context.CancelFunc // run to initiate shutdown
stopped chan struct{} // closed when shutdown is complete
name string
doProbe ProbeFunc
interval time.Duration
initialDelay time.Duration
tick ticker
// metrics is a Prometheus metrics registry for metrics exported by this probe.
// Using a separate registry allows cleanly removing metrics exported by this
// probe when it gets unregistered.
metrics *prometheus.Registry
mInterval *prometheus.Desc
mStartTime *prometheus.Desc
mEndTime *prometheus.Desc
mLatency *prometheus.Desc
mResult *prometheus.Desc
mu sync.Mutex
start time.Time // last time doProbe started
end time.Time // last time doProbe returned
latency time.Duration // last successful probe latency
succeeded bool // whether the last doProbe call succeeded
lastErr error
}
// Close shuts down the Probe and unregisters it from its Prober.
// It is safe to Run a new probe of the same name after Close returns.
func (p *Probe) Close() error {
p.cancel()
<-p.stopped
p.prober.unregister(p)
return nil
}
// probeLoop invokes runProbe on fun every interval. The first probe
// is run after a random delay (if spreading is enabled) or immediately.
func (p *Probe) loop() {
defer close(p.stopped)
if p.prober.spread && p.initialDelay > 0 {
t := p.prober.newTicker(p.initialDelay)
select {
case <-t.Chan():
p.run()
case <-p.ctx.Done():
t.Stop()
return
}
t.Stop()
} else {
p.run()
}
if p.prober.once {
return
}
p.tick = p.prober.newTicker(p.interval)
defer p.tick.Stop()
for {
select {
case <-p.tick.Chan():
p.run()
case <-p.ctx.Done():
return
}
}
}
// run invokes fun and records the results.
//
// fun is invoked with a timeout slightly less than interval, so that
// the probe either succeeds or fails before the next cycle is
// scheduled to start.
func (p *Probe) run() {
start := p.recordStart()
defer func() {
// Prevent a panic within one probe function from killing the
// entire prober, so that a single buggy probe doesn't destroy
// our entire ability to monitor anything. A panic is recorded
// as a probe failure, so panicking probes will trigger an
// alert for debugging.
if r := recover(); r != nil {
log.Printf("probe %s panicked: %v", p.name, r)
p.recordEnd(start, errors.New("panic"))
}
}()
timeout := time.Duration(float64(p.interval) * 0.8)
ctx, cancel := context.WithTimeout(p.ctx, timeout)
defer cancel()
err := p.doProbe(ctx)
p.recordEnd(start, err)
if err != nil {
log.Printf("probe %s: %v", p.name, err)
}
}
func (p *Probe) recordStart() time.Time {
st := p.prober.now()
p.mu.Lock()
defer p.mu.Unlock()
p.start = st
return st
}
func (p *Probe) recordEnd(start time.Time, err error) {
end := p.prober.now()
p.mu.Lock()
defer p.mu.Unlock()
p.end = end
p.succeeded = err == nil
p.lastErr = err
if p.succeeded {
p.latency = end.Sub(p.start)
} else {
p.latency = 0
}
}
// ProbeInfo is the state of a Probe.
type ProbeInfo struct {
Start time.Time
End time.Time
Latency string
Result bool
Error string
}
func (p *Prober) ProbeInfo() map[string]ProbeInfo {
out := map[string]ProbeInfo{}
p.mu.Lock()
probes := make([]*Probe, 0, len(p.probes))
for _, probe := range p.probes {
probes = append(probes, probe)
}
p.mu.Unlock()
for _, probe := range probes {
probe.mu.Lock()
inf := ProbeInfo{
Start: probe.start,
End: probe.end,
Result: probe.succeeded,
}
if probe.lastErr != nil {
inf.Error = probe.lastErr.Error()
}
if probe.latency > 0 {
inf.Latency = probe.latency.String()
}
out[probe.name] = inf
probe.mu.Unlock()
}
return out
}
// Describe implements prometheus.Collector.
func (p *Probe) Describe(ch chan<- *prometheus.Desc) {
ch <- p.mInterval
ch <- p.mStartTime
ch <- p.mEndTime
ch <- p.mResult
ch <- p.mLatency
}
// Collect implements prometheus.Collector.
func (p *Probe) Collect(ch chan<- prometheus.Metric) {
p.mu.Lock()
defer p.mu.Unlock()
ch <- prometheus.MustNewConstMetric(p.mInterval, prometheus.GaugeValue, p.interval.Seconds())
if !p.start.IsZero() {
ch <- prometheus.MustNewConstMetric(p.mStartTime, prometheus.GaugeValue, float64(p.start.Unix()))
}
if p.end.IsZero() {
return
}
ch <- prometheus.MustNewConstMetric(p.mEndTime, prometheus.GaugeValue, float64(p.end.Unix()))
if p.succeeded {
ch <- prometheus.MustNewConstMetric(p.mResult, prometheus.GaugeValue, 1)
} else {
ch <- prometheus.MustNewConstMetric(p.mResult, prometheus.GaugeValue, 0)
}
if p.latency > 0 {
ch <- prometheus.MustNewConstMetric(p.mLatency, prometheus.GaugeValue, float64(p.latency.Milliseconds()))
}
}
// ticker wraps a time.Ticker in a way that can be faked for tests.
type ticker interface {
Chan() <-chan time.Time
Stop()
}
type realTicker struct {
*time.Ticker
}
func (t *realTicker) Chan() <-chan time.Time {
return t.Ticker.C
}
func newRealTicker(d time.Duration) ticker {
return &realTicker{time.NewTicker(d)}
}
// initialDelay returns a pseudorandom duration in [0, interval) that
// is based on the provided seed string.
func initialDelay(seed string, interval time.Duration) time.Duration {
h := fnv.New64()
fmt.Fprint(h, seed)
r := rand.New(rand.NewSource(int64(h.Sum64()))).Float64()
return time.Duration(float64(interval) * r)
}