@@ -114,12 +114,12 @@ func (n *Node) Init(ctx context.Context) error {
114114 return err
115115 }
116116
117- consul , err := state .NewConsulClient ()
117+ cs , err := state .NewClusterState ()
118118 if err != nil {
119- return fmt .Errorf ("failed to establish connection with consul: %s " , err )
119+ return fmt .Errorf ("failed initialize cluster state store. %v " , err )
120120 }
121121
122- primaryIP , err := consul . CurrentPrimary ()
122+ primary , err := cs . PrimaryMember ()
123123 if err != nil {
124124 return fmt .Errorf ("failed to query current primary: %s" , err )
125125 }
@@ -133,7 +133,7 @@ func (n *Node) Init(ctx context.Context) error {
133133 fmt .Printf ("Failed to initialize repmgr: %s\n " , err .Error ())
134134 }
135135
136- err = SyncUserConfig (& repmgr , consul )
136+ err = SyncUserConfig (& repmgr , cs . Store )
137137 if err != nil {
138138 fmt .Printf ("Failed to sync user config from consul for repmgr: %s\n " , err .Error ())
139139 }
@@ -148,7 +148,7 @@ func (n *Node) Init(ctx context.Context) error {
148148 return err
149149 }
150150
151- err = SyncUserConfig (& pgbouncer , consul )
151+ err = SyncUserConfig (& pgbouncer , cs . Store )
152152 if err != nil {
153153 fmt .Printf ("Failed to sync user config from consul for pgbouncer: %s\n " , err .Error ())
154154 }
@@ -158,10 +158,8 @@ func (n *Node) Init(ctx context.Context) error {
158158 fmt .Printf ("Failed to write config files for pgbouncer: %s\n " , err .Error ())
159159 }
160160
161- switch primaryIP {
162- case n .PrivateIP :
163- // noop
164- case "" :
161+ switch {
162+ case primary == nil :
165163 // Initialize ourselves as the primary.
166164 fmt .Println ("Initializing postgres" )
167165 if err := n .initialize (); err != nil {
@@ -172,20 +170,22 @@ func (n *Node) Init(ctx context.Context) error {
172170 if err := n .setDefaultHBA (); err != nil {
173171 return fmt .Errorf ("failed updating pg_hba.conf: %s" , err )
174172 }
173+ case primary .Hostname == n .PrivateIP :
174+ // noop
175175 default :
176176 // If we are here we are either a standby, new node or primary coming back from the dead.
177177 clonePrimary := true
178178 if n .isInitialized () {
179179 // Attempt to resolve our role by querying the primary.
180- remoteConn , err := repmgr .NewRemoteConnection (ctx , primaryIP )
180+ remoteConn , err := repmgr .NewRemoteConnection (ctx , primary . Hostname )
181181 if err != nil {
182182 return fmt .Errorf ("failed to resolve my role according to the primary: %s" , err )
183183 }
184184 defer remoteConn .Close (ctx )
185185
186186 role , err := repmgr .memberRoleByHostname (ctx , remoteConn , n .PrivateIP )
187187 if err != nil {
188- return fmt .Errorf ("failed to resolve role for %s: %s" , primaryIP , err )
188+ return fmt .Errorf ("failed to resolve role for %s: %s" , primary . Hostname , err )
189189 }
190190
191191 fmt .Printf ("My role is: %s\n " , role )
@@ -196,7 +196,7 @@ func (n *Node) Init(ctx context.Context) error {
196196
197197 if clonePrimary {
198198 fmt .Println ("Cloning from primary" )
199- if err := repmgr .clonePrimary (primaryIP ); err != nil {
199+ if err := repmgr .clonePrimary (primary . Hostname ); err != nil {
200200 return fmt .Errorf ("failed to clone primary: %s" , err )
201201 }
202202 }
@@ -205,7 +205,7 @@ func (n *Node) Init(ctx context.Context) error {
205205 fmt .Println ("Resolving PG configuration settings." )
206206 PGConfig .Setup ()
207207
208- err = SyncUserConfig (PGConfig , consul )
208+ err = SyncUserConfig (PGConfig , cs . Store )
209209 if err != nil {
210210 fmt .Printf ("Failed to sync user config from consul for pgbouncer: %s\n " , err .Error ())
211211 }
@@ -226,27 +226,21 @@ func (n *Node) PostInit(ctx context.Context) error {
226226 }
227227 defer conn .Close (ctx )
228228
229- consul , err := state .NewConsulClient ()
229+ cs , err := state .NewClusterState ()
230230 if err != nil {
231- return fmt .Errorf ("failed to establish connection with consul: %s " , err )
231+ return fmt .Errorf ("failed initialize cluster state store. %v " , err )
232232 }
233233
234- primaryIP , err := consul . CurrentPrimary ()
234+ primary , err := cs . PrimaryMember ()
235235 if err != nil {
236236 return fmt .Errorf ("failed to query current primary: %s" , err )
237237 }
238238
239239 repmgr := n .RepMgr
240240 pgbouncer := n .PGBouncer
241241
242- switch primaryIP {
243- case n .PrivateIP :
244- // Re-register the primary in order to pick up any changes made to the configuration file.
245- fmt .Println ("Updating primary record" )
246- if err := repmgr .registerPrimary (); err != nil {
247- fmt .Printf ("failed to register primary with repmgr: %s" , err )
248- }
249- case "" :
242+ switch {
243+ case primary == nil :
250244 // Check if we can be a primary
251245 if ! repmgr .eligiblePrimary () {
252246 return fmt .Errorf ("no primary to follow and can't configure self as primary because primary region is '%s' and we are in '%s'" , os .Getenv ("PRIMARY_REGION" ), repmgr .Region )
@@ -258,17 +252,22 @@ func (n *Node) PostInit(ctx context.Context) error {
258252 }
259253
260254 // Setup repmgr database, extension, and register ourselves as the primary
261- fmt .Println ("Perform Repmgr setup" )
255+ fmt .Println ("Performing Repmgr setup" )
262256 if err := repmgr .setup (ctx , conn ); err != nil {
263257 fmt .Printf ("failed to setup repmgr: %s\n " , err )
264258 }
265259
266- if err := consul .RegisterPrimary (n .PrivateIP ); err != nil {
267- return fmt .Errorf ("failed to register primary with consul: %s" , err )
260+ // Register primary member with consul
261+ fmt .Println ("Registering member" )
262+ if err := cs .RegisterMember (repmgr .ID , n .PrivateIP , repmgr .Region , true ); err != nil {
263+ return fmt .Errorf ("failed to register member with consul: %s" , err )
268264 }
269265
270- if err := consul .RegisterNode (repmgr .ID , n .PrivateIP ); err != nil {
271- return fmt .Errorf ("failed to register member with consul: %s" , err )
266+ case primary .Hostname == n .PrivateIP :
267+ // Re-register the primary in order to pick up any changes made to the configuration file.
268+ fmt .Println ("Updating primary record" )
269+ if err := repmgr .registerPrimary (); err != nil {
270+ fmt .Printf ("failed to register primary with repmgr: %s" , err )
272271 }
273272 default :
274273 // If we are here we are a new node, standby or a demoted primary who needs to be reconfigured as a standby.
@@ -301,19 +300,19 @@ func (n *Node) PostInit(ctx context.Context) error {
301300 fmt .Printf ("failed to register standby: %s\n " , err )
302301 }
303302
304- fmt . Println ( "Registering Node with Consul" )
305- if err := consul . RegisterNode (repmgr .ID , n .PrivateIP ); err != nil {
303+ // Register member with consul if it hasn't been already
304+ if err := cs . RegisterMember (repmgr .ID , n .PrivateIP , repmgr . Region , false ); err != nil {
306305 return fmt .Errorf ("failed to register member with consul: %s" , err )
307306 }
308307 }
309308
310- // Requery the primaryIP in case a new primary was assigned above.
311- primaryIP , err = consul . CurrentPrimary ()
309+ // Requery the primaryIP from consul in case the primary was assigned above.
310+ primary , err = cs . PrimaryMember ()
312311 if err != nil {
313312 return fmt .Errorf ("failed to query current primary: %s" , err )
314313 }
315314
316- if err := pgbouncer .ConfigurePrimary (ctx , primaryIP , true ); err != nil {
315+ if err := pgbouncer .ConfigurePrimary (ctx , primary . Hostname , true ); err != nil {
317316 return fmt .Errorf ("failed to configure pgbouncer's primary: %s" , err )
318317 }
319318
0 commit comments