Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions pkg/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,12 @@ func (n *Node) PostInit() error {
return fmt.Errorf("no primary to follow and can't configure self as primary because primary region is '%s' and we are in '%s'", n.Region, os.Getenv("PRIMARY_REGION"))
}

// Initialize ourselves as the primary.
conn, err := n.NewLocalConnection(context.TODO())
if err != nil {
return err
}

// Initialize ourselves as the primary.
if err := n.createRequiredUsers(conn); err != nil {
return fmt.Errorf("failed to create required users: %s", err)
}
Expand Down Expand Up @@ -211,15 +211,30 @@ func (n *Node) PostInit() error {
case n.PrivateIP:
// We are an already initialized primary.
default:
// If we are here, we are a standby or a demoted primary who needs
// If we are here, we are a new node, a standby or a demoted primary who needs
// to be reconfigured as a standby.

// TODO - This should probably be a bit more calculated with this call.
// We don't care if this fails against a standby, but do care if this
// fails against a demoted primary.
fmt.Println("Unregistering primary")
if err := unregisterPrimary(*n); err != nil {
fmt.Printf("failed to unregister primary ( ignore ): %s\n", err)
conn, err := n.NewRepLocalConnection(context.TODO())
if err != nil {
return err
}

role, err := n.currentRole(context.TODO(), conn)
if err != nil {
return err
}

if role == "" {
fmt.Printf("Configuring a new node\n")
} else {
fmt.Printf("Reconfiguring a %s node as healthy\n", role)
}

if role == "primary" {
fmt.Println("Unregistering primary")
if err := unregisterPrimary(*n); err != nil {
fmt.Printf("failed to unregister primary: %s\n", err)
}
}

// TODO - Verify if there are any issues with attempting to re-register
Expand Down Expand Up @@ -248,7 +263,12 @@ func (n *Node) PostInit() error {

func (n *Node) NewLocalConnection(ctx context.Context) (*pgx.Conn, error) {
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.PGPort))
return openConnection(ctx, host, n.OperatorCredentials)
return openConnection(ctx, host, "postgres", n.OperatorCredentials)
}

func (n *Node) NewRepLocalConnection(ctx context.Context) (*pgx.Conn, error) {
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.PGPort))
return openConnection(ctx, host, "repmgr", n.ManagerCredentials)
}

func (n *Node) createRequiredUsers(conn *pgx.Conn) error {
Expand Down Expand Up @@ -385,11 +405,11 @@ func (n *Node) setDefaultHBA() error {
return nil
}

func openConnection(ctx context.Context, host string, creds Credentials) (*pgx.Conn, error) {
func openConnection(ctx context.Context, host string, database string, creds Credentials) (*pgx.Conn, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

url := fmt.Sprintf("postgres://%s/postgres", host)
url := fmt.Sprintf("postgres://%s/%s", host, database)
conf, err := pgx.ParseConfig(url)
if err != nil {
return nil, err
Expand Down
15 changes: 15 additions & 0 deletions pkg/flypg/repmgr.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package flypg

import (
"context"
"fmt"
"github.com/jackc/pgx/v4"
"os"
)

Expand Down Expand Up @@ -134,3 +136,16 @@ func writePasswdConf(node Node) error {

return nil
}

func (n *Node) currentRole(ctx context.Context, pg *pgx.Conn) (string, error) {
sql := fmt.Sprintf("select n.type from repmgr.nodes n LEFT JOIN repmgr.nodes un ON un.node_id = n.upstream_node_id WHERE n.node_id = '%d';", n.ID)
var role string
err := pg.QueryRow(ctx, sql).Scan(&role)
if err != nil {
if err == pgx.ErrNoRows {
return "", nil
}
return "", err
}
return role, nil
}