-
Notifications
You must be signed in to change notification settings - Fork 3
/
routingwatcher.go
99 lines (81 loc) · 2.11 KB
/
routingwatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package gocbcoreps
import (
"context"
"time"
"go.uber.org/zap"
"github.com/couchbase/goprotostellar/genproto/routing_v1"
)
type routingWatcherOptions struct {
RoutingClient routing_v1.RoutingServiceClient
BucketName string
RoutingTable *atomicRoutingTable
Logger *zap.Logger
}
type routingWatcher struct {
routingClient routing_v1.RoutingServiceClient
bucketName string
routingTable *atomicRoutingTable
logger *zap.Logger
ctx context.Context
ctxCancel func()
closeCh chan struct{}
}
func newRoutingWatcher(opts *routingWatcherOptions) *routingWatcher {
ctx, ctxCancel := context.WithCancel(context.Background())
w := &routingWatcher{
routingClient: opts.RoutingClient,
bucketName: opts.BucketName,
routingTable: opts.RoutingTable,
logger: opts.Logger,
ctx: ctx,
ctxCancel: ctxCancel,
closeCh: make(chan struct{}),
}
w.init()
return w
}
func (w *routingWatcher) init() {
go w.procThread()
}
func (w *routingWatcher) procThread() {
// We just use the default values for back off.
b := exponentialBackoff(0, 0, 0)
var numRetries uint32
MainLoop:
for {
topologyCh, err := w.routingClient.WatchRouting(w.ctx, &routing_v1.WatchRoutingRequest{
BucketName: &w.bucketName,
})
if err != nil {
w.logger.Error("failed to watch routing", zap.Error(err))
numRetries++
select {
case <-time.After(b(numRetries)):
continue
case <-w.ctx.Done():
break MainLoop
}
// ... handle the error
}
// Restart our backoff strategy now that we've successfully started watching...
numRetries = 0
for {
topologyData, err := topologyCh.Recv()
if err != nil {
w.logger.Error("failed to recv updated topology", zap.Error(err))
break
}
w.handleTopologyResponse(topologyData)
}
}
close(w.closeCh)
}
func (w *routingWatcher) Close() {
// shut down our context
w.ctxCancel()
// wait for the shutdown to complete
<-w.closeCh
}
func (w *routingWatcher) handleTopologyResponse(topology *routing_v1.WatchRoutingResponse) {
// TODO(brett19): Implement handling protostellar topologies received.
}