-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathsubscription.go
116 lines (90 loc) · 2.31 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package gilmour
import (
"sync"
)
//Every subscriptionManager should implement this interface.
type subscriber interface {
get(topic string) ([]*Subscription, bool)
add(t string, h handler, o *HandlerOpts) *Subscription
delete(topic string, s *Subscription)
deleteAll(topic string)
getAll() map[string][]*Subscription
}
//Returned for every topic that you subscribe to.
type Subscription struct {
handler handler
handlerOpts *HandlerOpts
}
func (s *Subscription) GetOpts() *HandlerOpts {
return s.handlerOpts
}
func (s *Subscription) GetHandler() handler {
return s.handler
}
// Subscriber that maintains a registry of subscriptions for topics.
// It is thread safe.
type subscriptionManager struct {
sync.RWMutex
hash map[string][]*Subscription
}
//get all active subscriptions. Returned format is map{topic: [Subscription]}
func (s *subscriptionManager) getAll() map[string][]*Subscription {
s.RLock()
defer s.RUnlock()
return s.hash
}
//get all subscriptions for a particular topic.
func (s *subscriptionManager) get(topic string) ([]*Subscription, bool) {
s.RLock()
defer s.RUnlock()
list, ok := s.hash[topic]
return list, ok
}
//add a new subscription for a topic. If this topic is being subscribed for
//the first time, initialize an array of subscriptions.
func (s *subscriptionManager) add(t string, h handler, o *HandlerOpts) *Subscription {
s.Lock()
defer s.Unlock()
if _, ok := s.hash[t]; !ok {
s.hash[t] = []*Subscription{}
}
sub := &Subscription{h, o}
arr := s.hash[t]
arr = append(arr, sub)
s.hash[t] = arr
return sub
}
//Remove a combination of topic & subscription from the manager.
func (s *subscriptionManager) delete(topic string, sub *Subscription) {
s.Lock()
defer s.Unlock()
list, ok := s.hash[topic]
if !ok {
return
}
new_list := []*Subscription{}
for _, elem := range list {
if elem == sub {
//Do nothing
continue
}
new_list = append(new_list, elem)
}
s.hash[topic] = new_list
if len(new_list) == 0 {
delete(s.hash, topic)
}
return
}
//Delete all subscriptions corresponding to a topic.
func (s *subscriptionManager) deleteAll(topic string) {
s.Lock()
defer s.Unlock()
s.hash[topic] = []*Subscription{}
}
//Constructor
func newSubscriptionManager() *subscriptionManager {
x := &subscriptionManager{}
x.hash = map[string][]*Subscription{}
return x
}