@@ -12,8 +12,9 @@ import (
1212)
1313
1414var (
15- monitorFrequency = time .Minute * 5
16- deadMemberRemovalThreshold = time .Hour * 24
15+ monitorFrequency = time .Minute * 5
16+ // TODO - Make this configurable and/or extend this to 12-24 hours.
17+ deadMemberRemovalThreshold = time .Hour * 1
1718)
1819
1920func main () {
@@ -37,44 +38,47 @@ func main() {
3738 ticker := time .NewTicker (monitorFrequency )
3839 defer ticker .Stop ()
3940
40- for range ticker .C {
41- role , err := flypgNode .RepMgr .CurrentRole (ctx , conn )
42- if err != nil {
43- fmt .Printf ("Failed to check role: %s\n " , err )
44- continue
45- }
46-
47- if role != flypg .PrimaryRoleName {
48- continue
49- }
41+ for {
42+ select {
43+ case <- ticker .C :
44+ role , err := flypgNode .RepMgr .CurrentRole (ctx , conn )
45+ if err != nil {
46+ fmt .Printf ("Failed to check role: %s\n " , err )
47+ continue
48+ }
5049
51- standbys , err := flypgNode .RepMgr .Standbys (ctx , conn )
52- if err != nil {
53- fmt .Printf ("Failed to query standbys: %s\n " , err )
54- continue
55- }
50+ if role != flypg .PrimaryRoleName {
51+ continue
52+ }
5653
57- for _ , standby := range standbys {
58- newConn , err := flypgNode .RepMgr .NewRemoteConnection (ctx , standby .Ip )
59- defer newConn .Close (ctx )
54+ standbys , err := flypgNode .RepMgr .Standbys (ctx , conn )
6055 if err != nil {
61- // TODO - Verify the exception that's getting thrown.
62- if time .Now ().Sub (seenAt [standby .Id ]) >= deadMemberRemovalThreshold {
63- if err := flypgNode .UnregisterMemberByID (ctx , int32 (standby .Id )); err != nil {
64- fmt .Printf ("failed to unregister member %d: %v\n " , standby .Id , err .Error ())
65- continue
56+ fmt .Printf ("Failed to query standbys: %s\n " , err )
57+ continue
58+ }
59+
60+ for _ , standby := range standbys {
61+ newConn , err := flypgNode .RepMgr .NewRemoteConnection (ctx , standby .Ip )
62+ defer newConn .Close (ctx )
63+ if err != nil {
64+ // TODO - Verify the exception that's getting thrown.
65+ if time .Now ().Sub (seenAt [standby .Id ]) >= deadMemberRemovalThreshold {
66+ if err := flypgNode .UnregisterMemberByID (ctx , int32 (standby .Id )); err != nil {
67+ fmt .Printf ("failed to unregister member %d: %v\n " , standby .Id , err .Error ())
68+ continue
69+ }
70+
71+ delete (seenAt , standby .Id )
6672 }
6773
68- delete ( seenAt , standby . Id )
74+ continue
6975 }
7076
71- continue
77+ seenAt [ standby . Id ] = time . Now ()
7278 }
7379
74- seenAt [ standby . Id ] = time . Now ( )
80+ removeOrphanedReplicationSlots ( ctx , conn , standbys )
7581 }
76-
77- removeOrphanedReplicationSlots (ctx , conn , standbys )
7882 }
7983}
8084
@@ -102,13 +106,13 @@ func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standby
102106 }
103107
104108 if len (orphanedSlots ) > 0 {
105- fmt .Printf ("%d orphaned replication slots detected" , len (orphanedSlots ))
109+ fmt .Printf ("%d orphaned replication slot(s) detected\n " , len (orphanedSlots ))
106110
107111 for _ , slot := range orphanedSlots {
108- fmt .Printf ("dropping replication slot: %s" , slot .Name )
112+ fmt .Printf ("Dropping replication slot: %s\n " , slot .Name )
109113
110114 if err := admin .DropReplicationSlot (ctx , conn , slot .Name ); err != nil {
111- fmt .Printf ("failed to drop replication slot %s: %v" , slot .Name , err )
115+ fmt .Printf ("failed to drop replication slot %s: %v\n " , slot .Name , err )
112116 continue
113117 }
114118 }
0 commit comments