This repository has been archived by the owner on Jun 30, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
registry.go
109 lines (96 loc) · 2.8 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package main;
import (
"gopkg.in/redis.v5"
"log"
"time"
"encoding/json"
"github.com/rcrowley/go-metrics"
"errors"
)
var lifeline_gauge metrics.Gauge
var redisClient redis.Cmdable = nil;
func initRegistry() {
lifeline_gauge = metrics.NewGauge()
metrics.Register("lifeline.lifelines", lifeline_gauge)
var options, err = redis.ParseURL(Config.RedisUrl)
if err != nil {
log.Panic(err);
}
if Config.RedisCluster {
log.Println("connecting to redis cluster", Config.RedisUrl);
redisClient = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{options.Addr},
})
_,err = redisClient.Ping().Result()
if err != nil {
log.Panic(err);
}
} else {
log.Println("connecting to", Config.RedisUrl);
redisClient = redis.NewClient(options);
_,err = redisClient.Ping().Result()
if err != nil {
log.Panic(err);
}
}
go registryKeepAlive();
}
const REDIS_REG_EXPIRE = time.Second * 20;
func registryKeepAlive() {
for {
time.Sleep(REDIS_REG_EXPIRE/2)
log.Println(len(lifelines), "lifeline connections on this instance");
lifeline_gauge.Update(int64(len(lifelines)))
if connection_counter != nil {
log.Println(connection_counter.Count(), "tcp connections");
}
for _, lf := range lifelines {
time.Sleep(10)
registerLifeline(lf, false);
}
}
}
func registerLifeline(lf *Lifeline, claim bool) {
val, err := redisClient.HGet(lf.index, "lifelined").Result()
if err == nil {
if (val != Config.WanAddr) {
if (claim) {
log.Println(lf.index, "is locked by", val, ". overtaking because our connection is new");
} else {
log.Println(lf.index, "is locked by", val, ". our connection is probably dead");
return;
}
}
}
b, _ := json.Marshal(lf.Properties)
c := lf.conn.RemoteAddr().String();
m := map[string]string{
"lifelined" : Config.WanAddr,
"headers" : string(b),
"origin" : c,
}
redisClient.Expire(lf.index, REDIS_REG_EXPIRE)
redisClient.HMSet(lf.index, m)
}
func registryGet(name string) (map[string]interface{}, error) {
var vv map[string]string;
var err error;
vv, err = redisClient.HGetAll(name).Result();
if err != nil {
return nil, err
}
var r = make(map[string]interface{})
if len(vv) < 1 {
return nil, errors.New("not found");
}
for k,v := range vv {
var rr = make(map[string]string)
err = json.Unmarshal([]byte(v), &rr)
if err != nil {
r[k] = v;
} else {
r[k] = rr;
}
}
return r, nil
}