forked from hashicorp/raft
-
Notifications
You must be signed in to change notification settings - Fork 0
/
observer.go
120 lines (104 loc) · 3.45 KB
/
observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package raft
import (
"sync/atomic"
)
// Observation is sent along the given channel to observers when an event occurs.
type Observation struct {
// Raft holds the Raft instance generating the observation.
Raft *Raft
// Data holds observation-specific data. Possible types are
// *RequestVoteRequest, RaftState and LeaderObservation.
Data interface{}
}
// LeaderObservation is used in Observation.Data when leadership changes.
type LeaderObservation struct {
Leader string
}
// nextObserverId is used to provide a unique ID for each observer to aid in
// deregistration.
var nextObserverID uint64
// FilterFn is a function that can be registered in order to filter observations.
// The function reports whether the observation should be included - if
// it returns false, the observation will be filtered out.
type FilterFn func(o *Observation) bool
// Observer describes what to do with a given observation.
type Observer struct {
// channel receives observations.
channel chan Observation
// blocking, if true, will cause Raft to block when sending an observation
// to this observer. This should generally be set to false.
blocking bool
// filter will be called to determine if an observation should be sent to
// the channel.
filter FilterFn
// id is the ID of this observer in the Raft map.
id uint64
// numObserved and numDropped are performance counters for this observer.
numObserved uint64
numDropped uint64
}
// NewObserver creates a new observer that can be registered
// to make observations on a Raft instance. Observations
// will be sent on the given channel if they satisfy the
// given filter.
//
// If blocking is true, the observer will block when it can't
// send on the channel, otherwise it may discard events.
func NewObserver(channel chan Observation, blocking bool, filter FilterFn) *Observer {
return &Observer{
channel: channel,
blocking: blocking,
filter: filter,
id: atomic.AddUint64(&nextObserverID, 1),
}
}
// GetNumObserved returns the number of observations.
func (or *Observer) GetNumObserved() uint64 {
return atomic.LoadUint64(&or.numObserved)
}
// GetNumDropped returns the number of dropped observations due to blocking.
func (or *Observer) GetNumDropped() uint64 {
return atomic.LoadUint64(&or.numDropped)
}
// RegisterObserver registers a new observer.
func (r *Raft) RegisterObserver(or *Observer) {
r.observersLock.Lock()
defer r.observersLock.Unlock()
r.observers[or.id] = or
}
// DeregisterObserver deregisters an observer.
func (r *Raft) DeregisterObserver(or *Observer) {
r.observersLock.Lock()
defer r.observersLock.Unlock()
delete(r.observers, or.id)
}
// observe sends an observation to every observer.
func (r *Raft) observe(o interface{}) {
// In general observers should not block. But in any case this isn't
// disastrous as we only hold a read lock, which merely prevents
// registration / deregistration of observers.
r.observersLock.RLock()
defer r.observersLock.RUnlock()
for _, or := range r.observers {
// It's wasteful to do this in the loop, but for the common case
// where there are no observers we won't create any objects.
ob := Observation{Raft: r, Data: o}
if or.filter != nil && !or.filter(&ob) {
continue
}
if or.channel == nil {
continue
}
if or.blocking {
or.channel <- ob
atomic.AddUint64(&or.numObserved, 1)
} else {
select {
case or.channel <- ob:
atomic.AddUint64(&or.numObserved, 1)
default:
atomic.AddUint64(&or.numDropped, 1)
}
}
}
}