@@ -7,10 +7,15 @@ import (
77 "time"
88
99 "github.com/fly-apps/postgres-flex/pkg/flypg"
10- "github.com/fly-apps/postgres-flex/pkg/flypg/state"
10+ "github.com/fly-apps/postgres-flex/pkg/flypg/admin"
11+ "github.com/jackc/pgx/v4"
1112)
1213
13- var Minute int64 = 60
14+ var (
15+ monitorFrequency = time .Minute * 5
16+ // TODO - Make this configurable and/or extend this to 12-24 hours.
17+ deadMemberRemovalThreshold = time .Hour * 1
18+ )
1419
1520func main () {
1621 ctx := context .Background ()
@@ -20,55 +25,95 @@ func main() {
2025 os .Exit (1 )
2126 }
2227
28+ // TODO - We should connect using the flypgadmin user so we can differentiate between
29+ // internal admin connection usage and the actual repmgr process.
2330 conn , err := flypgNode .RepMgr .NewLocalConnection (ctx )
2431 if err != nil {
2532 fmt .Printf ("failed to open local connection: %s\n " , err )
2633 os .Exit (1 )
2734 }
2835
29- ticker := time .NewTicker (5 * time .Second )
36+ seenAt := map [int ]time.Time {}
37+
38+ ticker := time .NewTicker (monitorFrequency )
3039 defer ticker .Stop ()
3140
32- seenAt := map [int ]int64 {}
41+ for {
42+ select {
43+ case <- ticker .C :
44+ role , err := flypgNode .RepMgr .CurrentRole (ctx , conn )
45+ if err != nil {
46+ fmt .Printf ("Failed to check role: %s\n " , err )
47+ continue
48+ }
3349
34- for _ = range ticker .C {
35- role , err := flypgNode .RepMgr .CurrentRole (ctx , conn )
36- if err != nil {
37- fmt .Printf ("Failed to check role: %s" , err )
38- continue
39- }
40- if role != "primary" {
41- continue
42- }
43- standbys , err := flypgNode .RepMgr .Standbys (ctx , conn )
44- if err != nil {
45- fmt .Printf ("Failed to get standbys: %s" , err )
46- continue
47- }
48- for _ , standby := range standbys {
49- newConn , err := flypgNode .RepMgr .NewRemoteConnection (ctx , standby .Ip )
50+ if role != flypg .PrimaryRoleName {
51+ continue
52+ }
53+
54+ standbys , err := flypgNode .RepMgr .Standbys (ctx , conn )
5055 if err != nil {
51- if time .Now ().Unix ()- seenAt [standby .Id ] >= 10 * Minute {
52- cs , err := state .NewClusterState ()
53- if err != nil {
54- fmt .Printf ("failed initialize cluster state store. %v" , err )
55- }
56+ fmt .Printf ("Failed to query standbys: %s\n " , err )
57+ continue
58+ }
5659
57- err = flypgNode .RepMgr .UnregisterStandby (standby .Id )
58- if err != nil {
59- fmt .Printf ("Failed to unregister %d: %s" , standby .Id , err )
60- continue
61- }
62- delete (seenAt , standby .Id )
60+ for _ , standby := range standbys {
61+ newConn , err := flypgNode .RepMgr .NewRemoteConnection (ctx , standby .Ip )
62+ defer newConn .Close (ctx )
63+ if err != nil {
64+ // TODO - Verify the exception that's getting thrown.
65+ if time .Now ().Sub (seenAt [standby .Id ]) >= deadMemberRemovalThreshold {
66+ if err := flypgNode .UnregisterMemberByID (ctx , int32 (standby .Id )); err != nil {
67+ fmt .Printf ("failed to unregister member %d: %v\n " , standby .Id , err .Error ())
68+ continue
69+ }
6370
64- // Remove from Consul
65- if err = cs .UnregisterMember (int32 (standby .Id )); err != nil {
66- fmt .Printf ("Failed to unregister %d from consul: %s" , standby .Id , err )
71+ delete (seenAt , standby .Id )
6772 }
73+
74+ continue
6875 }
69- } else {
70- seenAt [standby .Id ] = time .Now ().Unix ()
71- newConn .Close (ctx )
76+
77+ seenAt [standby .Id ] = time .Now ()
78+ }
79+
80+ removeOrphanedReplicationSlots (ctx , conn , standbys )
81+ }
82+ }
83+ }
84+
85+ func removeOrphanedReplicationSlots (ctx context.Context , conn * pgx.Conn , standbys []flypg.Standby ) {
86+ var orphanedSlots []admin.ReplicationSlot
87+
88+ slots , err := admin .ListReplicationSlots (ctx , conn )
89+ if err != nil {
90+ fmt .Printf ("failed to list replication slots: %s" , err )
91+ }
92+
93+ // An orphaned replication slot is defined as an inactive replication slot that is no longer tied to
94+ // and existing repmgr member.
95+ for _ , slot := range slots {
96+ matchFound := false
97+ for _ , standby := range standbys {
98+ if slot .MemberID == int32 (standby .Id ) {
99+ matchFound = true
100+ }
101+ }
102+
103+ if ! matchFound && ! slot .Active {
104+ orphanedSlots = append (orphanedSlots , slot )
105+ }
106+ }
107+
108+ if len (orphanedSlots ) > 0 {
109+ fmt .Printf ("%d orphaned replication slot(s) detected\n " , len (orphanedSlots ))
110+
111+ for _ , slot := range orphanedSlots {
112+ fmt .Printf ("Dropping replication slot: %s\n " , slot .Name )
113+
114+ if err := admin .DropReplicationSlot (ctx , conn , slot .Name ); err != nil {
115+ fmt .Printf ("failed to drop replication slot %s: %v\n " , slot .Name , err )
116+ continue
72117 }
73118 }
74119 }
0 commit comments