-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsubscriber.go
96 lines (83 loc) · 2.23 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package etcd
import (
"context"
"sync"
"github.com/devopsfaith/krakend/config"
"github.com/devopsfaith/krakend/sd"
)
var (
subscribers = map[string]sd.Subscriber{}
subscribersMutex = &sync.Mutex{}
fallbackSubscriberFactory = sd.FixedSubscriberFactory
)
// SubscriberFactory builds a an etcd subscriber SubscriberFactory with the received etcd client
func SubscriberFactory(ctx context.Context, c Client) sd.SubscriberFactory {
return func(cfg *config.Backend) sd.Subscriber {
if len(cfg.Host) == 0 {
return fallbackSubscriberFactory(cfg)
}
subscribersMutex.Lock()
defer subscribersMutex.Unlock()
if sf, ok := subscribers[cfg.Host[0]]; ok {
return sf
}
sf, err := NewSubscriber(ctx, c, cfg.Host[0])
if err != nil {
return fallbackSubscriberFactory(cfg)
}
subscribers[cfg.Host[0]] = sf
return sf
}
}
// Code taken from https://github.com/go-kit/kit/blob/master/sd/etcd/instancer.go
// Subscriber keeps instances stored in a certain etcd keyspace cached in a fixed subscriber. Any kind of
// change in that keyspace is watched and will update the Subscriber's list of hosts.
type Subscriber struct {
cache *sd.FixedSubscriber
mutex *sync.RWMutex
client Client
prefix string
ctx context.Context
}
// NewSubscriber returns an etcd subscriber. It will start watching the given
// prefix for changes, and update the subscribers.
func NewSubscriber(ctx context.Context, c Client, prefix string) (*Subscriber, error) {
s := &Subscriber{
client: c,
prefix: prefix,
cache: &sd.FixedSubscriber{},
ctx: ctx,
mutex: &sync.RWMutex{},
}
instances, err := s.client.GetEntries(s.prefix)
if err != nil {
return nil, err
}
*(s.cache) = sd.FixedSubscriber(instances)
go s.loop()
return s, nil
}
// Hosts implements the subscriber interface
func (s Subscriber) Hosts() ([]string, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.cache.Hosts()
}
func (s *Subscriber) loop() {
ch := make(chan struct{})
go s.client.WatchPrefix(s.prefix, ch)
for {
select {
case <-ch:
instances, err := s.client.GetEntries(s.prefix)
if err != nil {
continue
}
s.mutex.Lock()
*(s.cache) = sd.FixedSubscriber(instances)
s.mutex.Unlock()
case <-s.ctx.Done():
return
}
}
}