Skip to content

Commit b47d36e

Browse files
committed
Liking this better
1 parent 115925c commit b47d36e

File tree

5 files changed

+121
-100
lines changed

5 files changed

+121
-100
lines changed

cmd/event_handler/main.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,17 @@ func main() {
3030
case "repmgrd_failover_promote", "standby_promote":
3131
// TODO - Need to figure out what to do when success == 0.
3232

33-
consul, err := state.NewStore()
33+
cs, err := state.NewClusterState()
3434
if err != nil {
35-
fmt.Printf("failed to initialize consul client: %s", err)
35+
fmt.Printf("failed initialize cluster state store. %v", err)
3636
}
3737

38-
member, err := consul.FindMember(int32(*nodeID))
38+
member, err := cs.FindMember(int32(*nodeID))
3939
if err != nil {
4040
fmt.Printf("failed to find member %v: %s", *nodeID, err)
4141
}
4242

43-
if err := consul.AssignPrimary(member.ID); err != nil {
43+
if err := cs.AssignPrimary(member.ID); err != nil {
4444
fmt.Printf("failed to register primary with consul: %s", err)
4545
}
4646

@@ -54,17 +54,17 @@ func main() {
5454
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
5555
}
5656
case "standby_follow":
57-
consul, err := state.NewStore()
57+
cs, err := state.NewClusterState()
5858
if err != nil {
59-
fmt.Printf("failed to initialize consul client: %s", err)
59+
fmt.Printf("failed initialize cluster state store. %v", err)
6060
}
6161

6262
newMemberID, err := strconv.Atoi(*newPrimary)
6363
if err != nil {
6464
fmt.Printf("failed to parse new member id: %s", err)
6565
}
6666

67-
member, err := consul.FindMember(int32(newMemberID))
67+
member, err := cs.FindMember(int32(newMemberID))
6868
if err != nil {
6969
fmt.Printf("failed to find member in consul: %s", err)
7070
}

cmd/standby_cleaner/main.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,9 @@ func main() {
4949
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
5050
if err != nil {
5151
if time.Now().Unix()-seenAt[standby.Id] >= 10*Minute {
52-
consul, err := state.NewStore()
52+
cs, err := state.NewClusterState()
5353
if err != nil {
54-
fmt.Printf("Failed to establish connection with consul.")
55-
continue
54+
fmt.Printf("failed initialize cluster state store. %v", err)
5655
}
5756

5857
err = flypgNode.RepMgr.UnregisterStandby(standby.Id)
@@ -63,7 +62,7 @@ func main() {
6362
delete(seenAt, standby.Id)
6463

6564
// Remove from Consul
66-
if err = consul.UnregisterMember(int32(standby.Id)); err != nil {
65+
if err = cs.UnregisterMember(int32(standby.Id)); err != nil {
6766
fmt.Printf("Failed to unregister %d from consul: %s", standby.Id, err)
6867
}
6968
}

pkg/flypg/node.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ func (n *Node) Init(ctx context.Context) error {
114114
return err
115115
}
116116

117-
consul, err := state.NewStore()
117+
cs, err := state.NewClusterState()
118118
if err != nil {
119-
return err
119+
return fmt.Errorf("failed initialize cluster state store. %v", err)
120120
}
121121

122-
primary, err := consul.PrimaryMember()
122+
primary, err := cs.PrimaryMember()
123123
if err != nil {
124124
return fmt.Errorf("failed to query current primary: %s", err)
125125
}
@@ -210,12 +210,12 @@ func (n *Node) PostInit(ctx context.Context) error {
210210
}
211211
defer conn.Close(ctx)
212212

213-
consul, err := state.NewStore()
213+
cs, err := state.NewClusterState()
214214
if err != nil {
215-
return err
215+
return fmt.Errorf("failed initialize cluster state store. %v", err)
216216
}
217217

218-
primary, err := consul.PrimaryMember()
218+
primary, err := cs.PrimaryMember()
219219
if err != nil {
220220
return fmt.Errorf("failed to query current primary: %s", err)
221221
}
@@ -242,7 +242,7 @@ func (n *Node) PostInit(ctx context.Context) error {
242242
}
243243

244244
// Register primary member with consul
245-
if err := consul.RegisterMember(repmgr.ID, n.PrivateIP, repmgr.Region, true); err != nil {
245+
if err := cs.RegisterMember(repmgr.ID, n.PrivateIP, repmgr.Region, true); err != nil {
246246
return fmt.Errorf("failed to register member with consul: %s", err)
247247
}
248248

@@ -284,13 +284,13 @@ func (n *Node) PostInit(ctx context.Context) error {
284284
}
285285

286286
// Register member with consul if it hasn't been already
287-
if err := consul.RegisterMember(repmgr.ID, n.PrivateIP, repmgr.Region, false); err != nil {
287+
if err := cs.RegisterMember(repmgr.ID, n.PrivateIP, repmgr.Region, false); err != nil {
288288
return fmt.Errorf("failed to register member with consul: %s", err)
289289
}
290290
}
291291

292292
// Requery the primaryIP from consul in case the primary was assigned above.
293-
primary, err = consul.PrimaryMember()
293+
primary, err = cs.PrimaryMember()
294294
if err != nil {
295295
return fmt.Errorf("failed to query current primary: %s", err)
296296
}

pkg/flypg/state/state.go renamed to pkg/flypg/state/cluster.go

Lines changed: 19 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ import (
44
"encoding/json"
55
"errors"
66
"fmt"
7-
"net/url"
8-
"os"
97

108
"github.com/hashicorp/consul/api"
119
)
1210

11+
type ClusterState struct {
12+
store *Store
13+
}
14+
1315
type ClusterData struct {
1416
Members []*Member `json:"members"`
1517
}
@@ -30,34 +32,18 @@ var (
3032
ErrMemberNotFound = errors.New("member not found")
3133
)
3234

33-
type Store struct {
34-
client *api.Client
35-
prefix string
36-
}
37-
38-
func NewStore() (*Store, error) {
39-
conf, err := clientConfig()
40-
if err != nil {
41-
return nil, err
42-
}
43-
44-
client, err := api.NewClient(conf)
45-
if err != nil {
46-
return nil, err
47-
}
48-
49-
prefix, err := pathPrefix()
35+
func NewClusterState() (*ClusterState, error) {
36+
store, err := NewStore()
5037
if err != nil {
5138
return nil, err
5239
}
5340

54-
return &Store{
55-
client: client,
56-
prefix: prefix,
41+
return &ClusterState{
42+
store: store,
5743
}, nil
5844
}
5945

60-
func (c *Store) RegisterMember(id int32, hostname string, region string, primary bool) error {
46+
func (c *ClusterState) RegisterMember(id int32, hostname string, region string, primary bool) error {
6147
cluster, modifyIndex, err := c.clusterData()
6248
if err != nil {
6349
return err
@@ -86,7 +72,7 @@ func (c *Store) RegisterMember(id int32, hostname string, region string, primary
8672
return nil
8773
}
8874

89-
func (c *Store) UnregisterMember(id int32) error {
75+
func (c *ClusterState) UnregisterMember(id int32) error {
9076
cluster, modifyIndex, err := c.clusterData()
9177
if err != nil {
9278
return err
@@ -111,7 +97,7 @@ func (c *Store) UnregisterMember(id int32) error {
11197
return nil
11298
}
11399

114-
func (c *Store) AssignPrimary(id int32) error {
100+
func (c *ClusterState) AssignPrimary(id int32) error {
115101
cluster, modifyIndex, err := c.clusterData()
116102
if err != nil {
117103
return err
@@ -141,7 +127,7 @@ func (c *Store) AssignPrimary(id int32) error {
141127
return nil
142128
}
143129

144-
func (c *Store) PrimaryMember() (*Member, error) {
130+
func (c *ClusterState) PrimaryMember() (*Member, error) {
145131
cluster, _, err := c.clusterData()
146132
if err != nil {
147133
return nil, err
@@ -156,7 +142,7 @@ func (c *Store) PrimaryMember() (*Member, error) {
156142
return nil, nil
157143
}
158144

159-
func (c *Store) FindMember(id int32) (*Member, error) {
145+
func (c *ClusterState) FindMember(id int32) (*Member, error) {
160146
cluster, _, err := c.clusterData()
161147
if err != nil {
162148
return nil, err
@@ -171,13 +157,13 @@ func (c *Store) FindMember(id int32) (*Member, error) {
171157
return nil, nil
172158
}
173159

174-
func (c *Store) clusterData() (*ClusterData, uint64, error) {
160+
func (c *ClusterState) clusterData() (*ClusterData, uint64, error) {
175161
var (
176162
cluster ClusterData
177-
key = c.targetKey(stateKey)
163+
key = c.store.targetKey(stateKey)
178164
)
179165

180-
result, _, err := c.client.KV().Get(key, nil)
166+
result, _, err := c.store.Client.KV().Get(key, nil)
181167
if err != nil {
182168
return nil, 0, err
183169
}
@@ -193,18 +179,18 @@ func (c *Store) clusterData() (*ClusterData, uint64, error) {
193179
return &cluster, result.ModifyIndex, nil
194180
}
195181

196-
func (c *Store) updateClusterState(modifyIndex uint64, cluster *ClusterData) error {
182+
func (c *ClusterState) updateClusterState(modifyIndex uint64, cluster *ClusterData) error {
197183
clusterJSON, err := json.Marshal(c)
198184
if err != nil {
199185
return err
200186
}
201187

202188
kv := &api.KVPair{
203-
Key: c.targetKey(stateKey),
189+
Key: c.store.targetKey(stateKey),
204190
Value: clusterJSON,
205191
ModifyIndex: modifyIndex,
206192
}
207-
succ, _, err := c.client.KV().CAS(kv, nil)
193+
succ, _, err := c.store.Client.KV().CAS(kv, nil)
208194
if err != nil {
209195
return err
210196
}
@@ -216,50 +202,3 @@ func (c *Store) updateClusterState(modifyIndex uint64, cluster *ClusterData) err
216202

217203
return nil
218204
}
219-
220-
func (c *Store) PushUserConfig(key string, config []byte) error {
221-
kv := &api.KVPair{Key: c.targetKey(key), Value: config}
222-
_, err := c.client.KV().Put(kv, nil)
223-
return err
224-
}
225-
226-
func (c *Store) PullUserConfig(key string) ([]byte, error) {
227-
pair, _, err := c.client.KV().Get(c.targetKey(key), nil)
228-
if err != nil {
229-
return nil, err
230-
}
231-
return pair.Value, nil
232-
}
233-
234-
func (c *Store) targetKey(key string) string {
235-
return c.prefix + key
236-
}
237-
238-
func clientConfig() (*api.Config, error) {
239-
u, err := url.Parse(os.Getenv("FLY_CONSUL_URL"))
240-
if err != nil {
241-
panic(err)
242-
}
243-
244-
token, set := u.User.Password()
245-
if !set {
246-
return nil, fmt.Errorf("token not set")
247-
}
248-
249-
u.User = nil
250-
251-
return &api.Config{
252-
Token: token,
253-
Scheme: u.Scheme,
254-
Address: u.Hostname(),
255-
}, nil
256-
}
257-
258-
func pathPrefix() (string, error) {
259-
u, err := url.Parse(os.Getenv("FLY_CONSUL_URL"))
260-
if err != nil {
261-
return "", err
262-
}
263-
264-
return u.Path[1:], nil
265-
}

pkg/flypg/state/store.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package state
2+
3+
import (
4+
"fmt"
5+
"net/url"
6+
"os"
7+
8+
"github.com/hashicorp/consul/api"
9+
)
10+
11+
type Store struct {
12+
Client *api.Client
13+
prefix string
14+
}
15+
16+
func NewStore() (*Store, error) {
17+
conf, err := clientConfig()
18+
if err != nil {
19+
return nil, err
20+
}
21+
22+
client, err := api.NewClient(conf)
23+
if err != nil {
24+
return nil, err
25+
}
26+
27+
prefix, err := pathPrefix()
28+
if err != nil {
29+
return nil, err
30+
}
31+
32+
return &Store{
33+
Client: client,
34+
prefix: prefix,
35+
}, nil
36+
}
37+
38+
func (c *Store) PushUserConfig(key string, config []byte) error {
39+
kv := &api.KVPair{Key: c.targetKey(key), Value: config}
40+
_, err := c.Client.KV().Put(kv, nil)
41+
return err
42+
}
43+
44+
func (c *Store) PullUserConfig(key string) ([]byte, error) {
45+
pair, _, err := c.Client.KV().Get(c.targetKey(key), nil)
46+
if err != nil {
47+
return nil, err
48+
}
49+
return pair.Value, nil
50+
}
51+
52+
func (c *Store) targetKey(key string) string {
53+
return c.prefix + key
54+
}
55+
56+
func clientConfig() (*api.Config, error) {
57+
u, err := url.Parse(os.Getenv("FLY_CONSUL_URL"))
58+
if err != nil {
59+
panic(err)
60+
}
61+
62+
token, set := u.User.Password()
63+
if !set {
64+
return nil, fmt.Errorf("token not set")
65+
}
66+
67+
u.User = nil
68+
69+
return &api.Config{
70+
Token: token,
71+
Scheme: u.Scheme,
72+
Address: u.Hostname(),
73+
}, nil
74+
}
75+
76+
func pathPrefix() (string, error) {
77+
u, err := url.Parse(os.Getenv("FLY_CONSUL_URL"))
78+
if err != nil {
79+
return "", err
80+
}
81+
82+
return u.Path[1:], nil
83+
}

0 commit comments

Comments
 (0)