forked from Velocidex/velociraptor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnotifications.go
122 lines (100 loc) · 2.53 KB
/
notifications.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package notifications
import (
"strings"
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
notificationCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "frontend_notification_count",
Help: "Number of notifications we issue.",
})
)
type NotificationPool struct {
mu sync.Mutex
clients map[string]chan bool
done chan bool
}
func NewNotificationPool() *NotificationPool {
return &NotificationPool{
clients: make(map[string]chan bool),
done: make(chan bool),
}
}
func (self *NotificationPool) Count() uint64 {
self.mu.Lock()
defer self.mu.Unlock()
result := uint64(0)
for k := range self.clients {
// Only report real clients waiting for notifications.
if strings.HasPrefix(k, "C.") {
result++
}
}
return result
}
func (self *NotificationPool) ListClients() []string {
self.mu.Lock()
defer self.mu.Unlock()
result := make([]string, 0, len(self.clients))
for k := range self.clients {
result = append(result, k)
}
return result
}
func (self *NotificationPool) IsClientConnected(client_id string) bool {
self.mu.Lock()
_, pres := self.clients[client_id]
self.mu.Unlock()
return pres
}
func (self *NotificationPool) Listen(client_id string) (chan bool, func()) {
new_c := make(chan bool)
self.mu.Lock()
// Close any old channels and make a new one.
c, pres := self.clients[client_id]
if pres {
// This could happen because the client was connected,
// but the connection is dropped and the HTTP receiver
// is still blocked. This unblocks the old connection
// and returns an error on the new connection at the
// same time. If the old client is still connected, it
// will reconnect immediately but the new client will
// wait the full max poll to retry.
defer close(c)
delete(self.clients, client_id)
}
self.clients[client_id] = new_c
self.mu.Unlock()
return new_c, func() {
self.mu.Lock()
c, pres := self.clients[client_id]
if pres {
defer close(c)
delete(self.clients, client_id)
}
self.mu.Unlock()
}
}
func (self *NotificationPool) Notify(client_id string) {
self.mu.Lock()
c, pres := self.clients[client_id]
if pres {
notificationCounter.Inc()
defer close(c)
delete(self.clients, client_id)
}
self.mu.Unlock()
}
func (self *NotificationPool) Shutdown() {
self.mu.Lock()
defer self.mu.Unlock()
close(self.done)
// Send all the readers the quit signal and shut down the
// pool.
for _, c := range self.clients {
close(c)
}
self.clients = make(map[string]chan bool)
}