Skip to content

Commit

Permalink
feat: Aurora MySQL binding to reader endpoint
Browse files Browse the repository at this point in the history
Aurora has the concept of Writer nodes and Reader nodes. Reader nodes
are useful for High Availability, but they can also be used for
read-only workloads. This features exposes the ability to bind to the
reader endpoint for read-only workloads. This is achieved by passing the
parameters '{"reader":true}' to the 'cf bind-service' command.

[#183390030](https://www.pivotaltracker.com/story/show/183390030)
  • Loading branch information
blgm committed Oct 19, 2022
1 parent f43f12d commit b402882
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 106 deletions.
23 changes: 14 additions & 9 deletions acceptance-tests/apps/mysqlapp/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"fmt"
"log"
"mysqlapp/internal/connector"
"net/http"
"time"

Expand All @@ -17,13 +18,11 @@ const (
valueColumn = "valuedata"
)

func App(config *mysql.Config) *mux.Router {
db := connect(config)

func App(conn *connector.Connector) *mux.Router {
r := mux.NewRouter()
r.HandleFunc("/", aliveness).Methods("HEAD", "GET")
r.HandleFunc("/{key}", handleSet(db)).Methods("PUT")
r.HandleFunc("/{key}", handleGet(db)).Methods("GET")
r.HandleFunc("/{key}", handleSet(conn)).Methods("PUT")
r.HandleFunc("/{key}", handleGet(conn)).Methods("GET")

return r
}
Expand All @@ -33,19 +32,25 @@ func aliveness(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

func connect(config *mysql.Config) *sql.DB {
func connect(config *mysql.Config) (*sql.DB, error) {
db, err := sql.Open("mysql", config.FormatDSN())
if err != nil {
log.Fatalf("failed to connect to database: %s", err)
return nil, fmt.Errorf("failed to connect to database: %s", err)
}
db.SetConnMaxLifetime(time.Minute * 3)
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(10)

_, err = db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s VARCHAR(255) NOT NULL, %s VARCHAR(255) NOT NULL)`, tableName, keyColumn, valueColumn))
if err != nil {
log.Fatalf("failed to create test table: %s", err)
return nil, fmt.Errorf("failed to create test table: %s", err)
}

return db
return db, nil
}

func fail(w http.ResponseWriter, code int, format string, a ...any) {
msg := fmt.Sprintf(format, a...)
log.Println(msg)
http.Error(w, msg, code)
}
28 changes: 13 additions & 15 deletions acceptance-tests/apps/mysqlapp/internal/app/get.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,57 @@
package app

import (
"database/sql"
"fmt"
"log"
"mysqlapp/internal/connector"
"net/http"

"github.com/gorilla/mux"
)

func handleGet(db *sql.DB) func(w http.ResponseWriter, r *http.Request) {
func handleGet(conn *connector.Connector) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
log.Println("Handling get.")

db, err := conn.Connect()
if err != nil {
fail(w, http.StatusInternalServerError, "error connecting to database: %s", err)
}

key, ok := mux.Vars(r)["key"]
if !ok {
log.Println("Key missing.")
http.Error(w, "Key missing.", http.StatusBadRequest)
fail(w, http.StatusBadRequest, "key missing")
return
}

stmt, err := db.Prepare(fmt.Sprintf(`SELECT %s from %s WHERE %s = ?`, valueColumn, tableName, keyColumn))
if err != nil {
log.Printf("Error preparing statement: %s", err)
http.Error(w, "Failed to prepare statement.", http.StatusInternalServerError)
fail(w, http.StatusInternalServerError, "error preparing statement: %s", err)
return
}
defer stmt.Close()

rows, err := stmt.Query(key)
if err != nil {
log.Printf("Error selecting value: %s", err)
http.Error(w, "Failed to select value.", http.StatusNotFound)
fail(w, http.StatusNotFound, "error selecting value: %s", err)
return
}
defer rows.Close()

if !rows.Next() {
log.Printf("Error finding value: %s", err)
http.Error(w, "Failed to find value.", http.StatusNotFound)
fail(w, http.StatusNotFound, "error finding value: %s", err)
return
}

var value string
if err := rows.Scan(&value); err != nil {
log.Printf("Error retrieving value: %s", err)
http.Error(w, "Failed to retrieve value.", http.StatusNotFound)
fail(w, http.StatusNotFound, "error retrieving value: %s", err)
return
}

w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "text/html")
_, err = w.Write([]byte(value))

if err != nil {
if _, err := w.Write([]byte(value)); err != nil {
log.Printf("Error writing value: %s", err)
return
}
Expand Down
30 changes: 18 additions & 12 deletions acceptance-tests/apps/mysqlapp/internal/app/set.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,51 @@
package app

import (
"database/sql"
"fmt"
"io"
"log"
"mysqlapp/internal/connector"
"net/http"

"github.com/gorilla/mux"
)

func handleSet(db *sql.DB) func(w http.ResponseWriter, r *http.Request) {
func handleSet(conn *connector.Connector) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
log.Println("Handling set.")

db, err := conn.Connect()
if err != nil {
fail(w, http.StatusInternalServerError, "error connecting to database: %s", err)
}

_, err = db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s VARCHAR(255) NOT NULL, %s VARCHAR(255) NOT NULL)`, tableName, keyColumn, valueColumn))
if err != nil {
fail(w, http.StatusInternalServerError, "failed to create test table: %s", err)
return
}

key, ok := mux.Vars(r)["key"]
if !ok {
log.Println("Key missing.")
http.Error(w, "Key missing.", http.StatusBadRequest)
fail(w, http.StatusBadRequest, "key missing")
return
}

rawValue, err := io.ReadAll(r.Body)
if err != nil {
log.Printf("Error parsing value: %s", err)
http.Error(w, "Failed to parse value.", http.StatusBadRequest)
fail(w, http.StatusBadRequest, "error parsing value: %s", err)
return
}

stmt, err := db.Prepare(fmt.Sprintf(`INSERT INTO %s (%s, %s) VALUES (?, ?)`, tableName, keyColumn, valueColumn))
if err != nil {
log.Printf("Error preparing statement: %s", err)
http.Error(w, "Failed to prepare statement.", http.StatusInternalServerError)
fail(w, http.StatusInternalServerError, "error preparing statement: %s", err)
return
}
defer stmt.Close()

_, err = stmt.Exec(key, string(rawValue))
if err != nil {
log.Printf("Error inserting values: %s", err)
http.Error(w, "Failed to insert values.", http.StatusBadRequest)
if _, err := stmt.Exec(key, string(rawValue)); err != nil {
fail(w, http.StatusBadRequest, "error inserting values: %s", err)
return
}

Expand Down
53 changes: 53 additions & 0 deletions acceptance-tests/apps/mysqlapp/internal/connector/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package connector

import (
"database/sql"
"fmt"
"time"

"github.com/go-sql-driver/mysql"
)

type option func(*Connector, *mysql.Config) error

func (c *Connector) Connect(opts ...option) (*sql.DB, error) {
cfg := mysql.NewConfig()
cfg.Net = "tcp"
cfg.Addr = c.Host
cfg.User = c.Username
cfg.Passwd = c.Password
cfg.DBName = c.Database
withDefaults(opts...)(c, cfg)

db, err := sql.Open("mysql", cfg.FormatDSN())
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %s", err)
}
db.SetConnMaxLifetime(time.Minute * 3)
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(10)

return db, nil
}

func withOptions(opts ...option) option {
return func(conn *Connector, cfg *mysql.Config) error {
for _, o := range opts {
if err := o(conn, cfg); err != nil {
return err
}
}
return nil
}
}

func withDefaults(opts ...option) option {
return withOptions(append([]option{withTLS()}, opts...)...)
}

func withTLS() option {
return func(_ *Connector, cfg *mysql.Config) error {
cfg.TLSConfig = "true"
return nil
}
}
55 changes: 55 additions & 0 deletions acceptance-tests/apps/mysqlapp/internal/connector/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package connector

import (
"fmt"

"github.com/cloudfoundry-community/go-cfenv"
"github.com/mitchellh/mapstructure"
)

type Connector struct {
Host string `mapstructure:"hostname"`
Database string `mapstructure:"name"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Port int `mapstructure:"port"`
}

func New() (*Connector, error) {
app, err := cfenv.Current()
if err != nil {
return nil, fmt.Errorf("error reading app env: %w", err)
}
svs, err := app.Services.WithTag("mysql")
if err != nil {
return nil, fmt.Errorf("error reading MySQL service details")
}

var c Connector
if err := mapstructure.Decode(svs[0].Credentials, &c); err != nil {
return nil, fmt.Errorf("failed to decode credentials: %w", err)
}

if err := c.Valid(); err != nil {
return nil, err
}

return &c, nil
}

func (c *Connector) Valid() error {
switch {
case c.Host == "":
return fmt.Errorf("missing hostname")
case c.Username == "":
return fmt.Errorf("missing username")
case c.Password == "":
return fmt.Errorf("missing password")
case c.Database == "":
return fmt.Errorf("missing database name")
case c.Port == 0:
return fmt.Errorf("missing port")
}

return nil
}
46 changes: 0 additions & 46 deletions acceptance-tests/apps/mysqlapp/internal/credentials/credentials.go

This file was deleted.

6 changes: 3 additions & 3 deletions acceptance-tests/apps/mysqlapp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"log"
"mysqlapp/internal/app"
"mysqlapp/internal/credentials"
"mysqlapp/internal/connector"
"net/http"
"os"
)
Expand All @@ -13,14 +13,14 @@ func main() {
log.Println("Starting.")

log.Println("Reading credentials.")
creds, err := credentials.Read()
conn, err := connector.New()
if err != nil {
panic(err)
}

port := port()
log.Printf("Listening on port: %s", port)
http.Handle("/", app.App(creds))
http.Handle("/", app.App(conn))
http.ListenAndServe(port, nil)
}

Expand Down
Loading

0 comments on commit b402882

Please sign in to comment.