Skip to content

Commit 191cb8b

Browse files
author
Dov Alperin
committed
Rip out haproxy and replace it with pgbouncer
1 parent 904d023 commit 191cb8b

File tree

8 files changed

+141
-56
lines changed

8 files changed

+141
-56
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.idea/

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ LABEL fly.version=${VERSION}
2020
LABEL fly.pg-version=${PG_VERSION}
2121

2222
RUN apt-get update && apt-get install --no-install-recommends -y \
23-
ca-certificates iproute2 haproxy postgresql-14-repmgr curl bash dnsutils vim procps jq \
23+
ca-certificates iproute2 haproxy postgresql-14-repmgr curl bash dnsutils vim procps jq pgbouncer \
2424
&& apt autoremove -y
2525

2626
COPY --from=0 /fly/bin/* /usr/local/bin

cmd/event_handler/main.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ package main
33
import (
44
"flag"
55
"fmt"
6+
"github.com/fly-apps/postgres-flex/pkg/flypg"
67

78
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
89
)
910

1011
func main() {
1112
event := flag.String("event", "", "event type")
1213
nodeID := flag.Int("node-id", 0, "the node id")
14+
newPrimary := flag.Int("new-node-id", 0, "the new primary node id")
1315
success := flag.String("success", "", "success (1) failure (0)")
1416
details := flag.String("details", "", "details")
1517
flag.Parse()
@@ -33,6 +35,33 @@ func main() {
3335
if err := client.RegisterPrimary(string(node.Value)); err != nil {
3436
fmt.Printf("failed to register primary: %s", err)
3537
}
38+
39+
flypgNode, err := flypg.NewNode()
40+
if err != nil {
41+
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
42+
}
43+
44+
fmt.Println("Reconfiguring pgbouncer primary")
45+
if err := flypgNode.ConfigurePGBouncerPrimary(string(node.Value), true); err != nil {
46+
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
47+
}
48+
case "standby_follow":
49+
client, err := state.NewConsulClient()
50+
if err != nil {
51+
fmt.Printf("failed to initialize consul client: %s", err)
52+
}
53+
node, err := client.Node(int32(*newPrimary))
54+
if err != nil {
55+
fmt.Printf("failed to find node: %s", err)
56+
}
57+
flypgNode, err := flypg.NewNode()
58+
if err != nil {
59+
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
60+
}
61+
fmt.Println("Reconfiguring pgbouncer primary")
62+
if err := flypgNode.ConfigurePGBouncerPrimary(string(node.Value), true); err != nil {
63+
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
64+
}
3665
default:
3766
// noop
3867
}

cmd/start/main.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func main() {
3333
for range t.C {
3434

3535
if err := node.PostInit(); err != nil {
36-
fmt.Printf("failed post-init: %s. Retrying...", err)
36+
fmt.Printf("failed post-init: %s. Retrying...\n", err)
3737
continue
3838
}
3939

@@ -43,12 +43,7 @@ func main() {
4343

4444
svisor := supervisor.New("flypg", 5*time.Minute)
4545

46-
proxyEnv := map[string]string{
47-
"FLY_APP_NAME": os.Getenv("FLY_APP_NAME"),
48-
"PRIMARY_REGION": os.Getenv("PRIMARY_REGION"),
49-
"PG_LISTEN_ADDRESS": node.PrivateIP,
50-
}
51-
svisor.AddProcess("proxy", "/usr/sbin/haproxy -W -db -f /fly/haproxy.cfg", supervisor.WithEnv(proxyEnv), supervisor.WithRestart(0, 1*time.Second))
46+
svisor.AddProcess("pgbouncer", "/usr/sbin/pgbouncer /fly/pgbouncer.ini", supervisor.WithRestart(0, 1*time.Second))
5247

5348
env := map[string]string{
5449
"PGDATA": os.Getenv("PGDATA"),

config/haproxy.cfg

Lines changed: 0 additions & 38 deletions
This file was deleted.

config/pgbouncer.ini

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
[pgbouncer]
2+
3+
listen_addr = *
4+
listen_port = 5432
5+
unix_socket_dir = /tmp
6+
7+
auth_user = postgres
8+
auth_file = /data/pgbouncer.auth
9+
10+
admin_users = postgres
11+
12+
user = postgres
13+
14+
pool_mode = transaction
15+
16+
max_client_conn = 100
17+
default_pool_size = 20
18+
min_pool_size = 5
19+
reserve_pool_size = 5
20+
reserve_pool_timeout = 3
21+
22+
log_connections = 1
23+
log_disconnections = 1
24+
log_pooler_errors = 1
25+
26+
%include /data/pgbouncer.database.ini

pkg/flypg/node.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ func (n *Node) Init() error {
148148
return fmt.Errorf("failed to configure postgres %s", err)
149149
}
150150

151+
fmt.Println("Configuring pgbouncer auth")
152+
if err := n.ConfigurePGBouncerAuth(); err != nil {
153+
return fmt.Errorf("failed to configure pgbouncer auth %s", err)
154+
}
155+
151156
return nil
152157
}
153158

@@ -258,9 +263,24 @@ func (n *Node) PostInit() error {
258263
}
259264
}
260265

266+
primaryIP, err = client.CurrentPrimary()
267+
if err != nil {
268+
return fmt.Errorf("failed to query current primary: %s", err)
269+
}
270+
271+
fmt.Println("Configuring pgbouncer primary")
272+
if err := n.ConfigurePGBouncerPrimary(primaryIP, false); err != nil {
273+
return fmt.Errorf("failed to configure pgbouncer primary %s", err)
274+
}
275+
261276
return nil
262277
}
263278

279+
func (n *Node) NewPGBouncerConnection(ctx context.Context) (*pgx.Conn, error) {
280+
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(5432))
281+
return openConnection(ctx, host, "pgbouncer", n.OperatorCredentials)
282+
}
283+
264284
func (n *Node) NewLocalConnection(ctx context.Context) (*pgx.Conn, error) {
265285
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.PGPort))
266286
return openConnection(ctx, host, "postgres", n.OperatorCredentials)
@@ -325,6 +345,53 @@ func (n *Node) initializePostgres() error {
325345
return err
326346
}
327347

348+
func (n *Node) ConfigurePGBouncerAuth() error {
349+
path := fmt.Sprintf("%s/pgbouncer.auth", "/data")
350+
file, err := os.OpenFile(path, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0644)
351+
if err != nil {
352+
return err
353+
}
354+
contents := fmt.Sprintf("\"%s\" \"%s\"", n.OperatorCredentials.Username, n.OperatorCredentials.Password)
355+
_, err = file.Write([]byte(contents))
356+
if err != nil {
357+
return err
358+
}
359+
return nil
360+
}
361+
362+
func (n *Node) ConfigurePGBouncerPrimary(primary string, reload bool) error {
363+
path := fmt.Sprintf("%s/pgbouncer.database.ini", "/data")
364+
file, err := os.OpenFile(path, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0644)
365+
if err != nil {
366+
return err
367+
}
368+
contents := fmt.Sprintf("[databases]\n* = host=%s port=%d\n", primary, 5433)
369+
_, err = file.Write([]byte(contents))
370+
if err != nil {
371+
return err
372+
}
373+
374+
if reload {
375+
err = n.ReloadPGBouncerConfig()
376+
if err != nil {
377+
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
378+
}
379+
}
380+
return nil
381+
}
382+
383+
func (n *Node) ReloadPGBouncerConfig() error {
384+
conn, err := n.NewPGBouncerConnection(context.TODO())
385+
if err != nil {
386+
return err
387+
}
388+
_, err = conn.Exec(context.TODO(), "RELOAD;")
389+
if err != nil {
390+
return err
391+
}
392+
return nil
393+
}
394+
328395
func (n *Node) configurePostgres() error {
329396
cmdStr := fmt.Sprintf("sed -i \"s/#shared_preload_libraries.*/shared_preload_libraries = 'repmgr'/\" /data/postgresql/postgresql.conf")
330397
return runCommand(cmdStr)

pkg/flypg/repmgr.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func cloneFromPrimary(node Node, ipStr string) error {
7171
return err
7272
}
7373

74-
cmdStr = fmt.Sprintf("repmgr -h %s -d %s -U %s -f %s standby clone -F",
74+
cmdStr = fmt.Sprintf("repmgr -h %s -p 5433 -d %s -U %s -f %s standby clone -F",
7575
ipStr,
7676
node.ManagerDatabaseName,
7777
node.ManagerCredentials.Username,
@@ -92,18 +92,23 @@ func writeManagerConf(node Node) error {
9292
}
9393

9494
conf := map[string]interface{}{
95-
"node_id": fmt.Sprint(node.ID),
96-
"node_name": fmt.Sprintf("'%s'", node.PrivateIP),
97-
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=10'", node.PrivateIP, node.PGPort, node.ManagerCredentials.Username, node.ManagerDatabaseName),
98-
"data_directory": fmt.Sprintf("'%s'", node.DataDir),
99-
"failover": "'automatic'",
100-
"promote_command": fmt.Sprintf("'repmgr standby promote -f %s --log-to-file'", node.ManagerConfigPath),
101-
"follow_command": fmt.Sprintf("'repmgr standby follow -f %s --log-to-file --upstream-node-id=%%n'", node.ManagerConfigPath),
102-
"event_notification_command": fmt.Sprintf("'/usr/local/bin/event_handler -node-id %%n -event %%e -success %%s -details \"%%d\"'"),
103-
"event_notifications": "'repmgrd_failover_promote,standby_promote'",
95+
"node_id": fmt.Sprint(node.ID),
96+
"node_name": fmt.Sprintf("'%s'", node.PrivateIP),
97+
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=10'", node.PrivateIP, node.PGPort, node.ManagerCredentials.Username, node.ManagerDatabaseName),
98+
"data_directory": fmt.Sprintf("'%s'", node.DataDir),
99+
"failover": "'automatic'",
100+
"promote_command": fmt.Sprintf("'repmgr standby promote -f %s --log-to-file'", node.ManagerConfigPath),
101+
"follow_command": fmt.Sprintf("'repmgr standby follow -f %s --log-to-file --upstream-node-id=%%n'", node.ManagerConfigPath),
102+
// FIXME: %p does not always exist (usually on the new primary node) so event_handler throws and error, this doesn't break anything but is clunky
103+
"event_notification_command": fmt.Sprintf("'/usr/local/bin/event_handler -node-id %%n -event %%e -success %%s -details \"%%d\" -new-node-id %%p'"),
104+
"event_notifications": "'repmgrd_failover_promote,standby_promote,standby_follow'",
104105
"location": node.Region,
105106
}
106107

108+
if !node.ValidPrimary() {
109+
conf["priority"] = "0"
110+
}
111+
107112
for key, value := range conf {
108113
str := fmt.Sprintf("%s=%s\n", key, value)
109114
_, err := file.Write([]byte(str))

0 commit comments

Comments
 (0)