Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 13 additions & 22 deletions pkg/clients/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"

"github.com/crossplane-contrib/provider-sql/pkg/clients/xsql"
Expand All @@ -16,7 +17,6 @@ import (

const (
errNotSupported = "%s not supported by mysql client"
errSetSQLLogBin = "cannot set sql_log_bin = 0"
errFlushPriv = "cannot flush privileges"
)

Expand All @@ -28,7 +28,7 @@ type mySQLDB struct {
}

// New returns a new MySQL database client.
func New(creds map[string][]byte, tls *string) xsql.DB {
func New(creds map[string][]byte, tls *string, binlog *bool) xsql.DB {
// TODO(negz): Support alternative connection secret formats?
endpoint := string(creds[xpv1.ResourceCredentialsSecretEndpointKey])
port := string(creds[xpv1.ResourceCredentialsSecretPortKey])
Expand All @@ -38,7 +38,11 @@ func New(creds map[string][]byte, tls *string) xsql.DB {
defaultTLS := "preferred"
tls = &defaultTLS
}
dsn := DSN(username, password, endpoint, port, *tls)
if binlog == nil {
defaultBinlog := true
binlog = &defaultBinlog
}
dsn := DSN(username, password, endpoint, port, *tls, *binlog)

return mySQLDB{
dsn: dsn,
Expand All @@ -49,16 +53,17 @@ func New(creds map[string][]byte, tls *string) xsql.DB {
}

// DSN returns the DSN URL
func DSN(username, password, endpoint, port, tls string) string {
func DSN(username, password, endpoint, port, tls string, binlog bool) string {
// Use net/url UserPassword to encode the username and password
// This will ensure that any special characters in the username or password
// are percent-encoded for use in the user info portion of the DSN URL
return fmt.Sprintf("%s:%s@tcp(%s:%s)/?tls=%s",
return fmt.Sprintf("%s:%s@tcp(%s:%s)/?tls=%s&sql_log_bin=%s",
username,
password,
endpoint,
port,
tls)
tls,
strconv.FormatBool(binlog))
}

// ExecTx is unsupported in MySQL.
Expand Down Expand Up @@ -143,30 +148,16 @@ type ExecQuery struct {

// ExecOptions parametrizes which optional statements will be executed before or after ExecQuery.Query
type ExecOptions struct {
// Binlog defines whether storing binlogs will be disabled before executing the query. Defaults to true
Binlog *bool
// Flush defines whether privileges will be flushed after executing the query. Defaults to true
Flush *bool
}

// ExecWithBinlogAndFlush is a wrapper function for xsql.DB.Exec() that allows the execution of optional queries before and after the provided query
func ExecWithBinlogAndFlush(ctx context.Context, db xsql.DB, query ExecQuery, options ExecOptions) error {
if options.Binlog == nil {
options.Binlog = ptr.To(true)
}

// ExecWithFlush is a wrapper function for xsql.DB.Exec() that allows the execution of optional queries before and after the provided query
func ExecWithFlush(ctx context.Context, db xsql.DB, query ExecQuery, options ExecOptions) error {
if options.Flush == nil {
options.Flush = ptr.To(true)
}

if !*options.Binlog {
if err := db.Exec(ctx, xsql.Query{
String: "SET sql_log_bin = 0",
}); err != nil {
return errors.Wrap(err, errSetSQLLogBin)
}
}

if err := db.Exec(ctx, xsql.Query{
String: query.Query,
}); err != nil {
Expand Down
9 changes: 6 additions & 3 deletions pkg/clients/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mysql

import (
"fmt"
"strconv"
"testing"
)

Expand All @@ -11,13 +12,15 @@ func TestDSNURLEscaping(t *testing.T) {
user := "username"
rawPass := "password^"
tls := "true"
dsn := DSN(user, rawPass, endpoint, port, tls)
if dsn != fmt.Sprintf("%s:%s@tcp(%s:%s)/?tls=%s",
binlog := false
dsn := DSN(user, rawPass, endpoint, port, tls, binlog)
if dsn != fmt.Sprintf("%s:%s@tcp(%s:%s)/?tls=%s&sql_log_bin=%s",
user,
rawPass,
endpoint,
port,
tls) {
tls,
strconv.FormatBool(binlog)) {
t.Errorf("DSN string did not match expected output with URL encoded")
}
}
10 changes: 4 additions & 6 deletions pkg/controller/mysql/database/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func Setup(mgr ctrl.Manager, o xpcontroller.Options) error {
type connector struct {
kube client.Client
usage resource.Tracker
newDB func(creds map[string][]byte, tls *string) xsql.DB
newDB func(creds map[string][]byte, tls *string, binlog *bool) xsql.DB
}

func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) {
Expand Down Expand Up @@ -110,7 +110,7 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E
return nil, errors.Wrap(err, errGetSecret)
}

return &external{db: c.newDB(s.Data, pc.Spec.TLS)}, nil
return &external{db: c.newDB(s.Data, pc.Spec.TLS, cr.Spec.ForProvider.BinLog)}, nil
}

type external struct{ db xsql.DB }
Expand Down Expand Up @@ -148,10 +148,9 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
return managed.ExternalCreation{}, errors.New(errNotDatabase)
}

binlog := cr.Spec.ForProvider.BinLog
query := "CREATE DATABASE " + mysql.QuoteIdentifier(meta.GetExternalName(cr))

if err := mysql.ExecWithBinlogAndFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errCreateDB}, mysql.ExecOptions{Binlog: binlog, Flush: ptr.To(false)}); err != nil {
if err := mysql.ExecWithFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errCreateDB}, mysql.ExecOptions{Flush: ptr.To(false)}); err != nil {
return managed.ExternalCreation{}, err
}

Expand All @@ -169,10 +168,9 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {
return errors.New(errNotDatabase)
}

binlog := cr.Spec.ForProvider.BinLog
query := "DROP DATABASE IF EXISTS " + mysql.QuoteIdentifier(meta.GetExternalName(cr))

if err := mysql.ExecWithBinlogAndFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errDropDB}, mysql.ExecOptions{Binlog: binlog, Flush: ptr.To(false)}); err != nil {
if err := mysql.ExecWithFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errDropDB}, mysql.ExecOptions{Flush: ptr.To(false)}); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/mysql/database/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestConnect(t *testing.T) {
type fields struct {
kube client.Client
usage resource.Tracker
newDB func(creds map[string][]byte, tls *string) xsql.DB
newDB func(creds map[string][]byte, tls *string, binlog *bool) xsql.DB
}

type args struct {
Expand Down
21 changes: 8 additions & 13 deletions pkg/controller/mysql/grant/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func Setup(mgr ctrl.Manager, o xpcontroller.Options) error {
type connector struct {
kube client.Client
usage resource.Tracker
newDB func(creds map[string][]byte, tls *string) xsql.DB
newDB func(creds map[string][]byte, tls *string, binlog *bool) xsql.DB
}

func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) {
Expand Down Expand Up @@ -121,7 +121,7 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E
}

return &external{
db: c.newDB(s.Data, pc.Spec.TLS),
db: c.newDB(s.Data, pc.Spec.TLS, cr.Spec.ForProvider.BinLog),
kube: c.kube,
}, nil
}
Expand Down Expand Up @@ -259,10 +259,9 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
table := defaultIdentifier(cr.Spec.ForProvider.Table)

privileges, grantOption := getPrivilegesString(cr.Spec.ForProvider.Privileges.ToStringSlice())
binlog := cr.Spec.ForProvider.BinLog
query := createGrantQuery(privileges, dbname, username, table, grantOption)

if err := mysql.ExecWithBinlogAndFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errCreateGrant}, mysql.ExecOptions{Binlog: binlog}); err != nil {
if err := mysql.ExecWithFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errCreateGrant}, mysql.ExecOptions{}); err != nil {
return managed.ExternalCreation{}, err
}
return managed.ExternalCreation{}, nil
Expand All @@ -277,7 +276,6 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext
username := *cr.Spec.ForProvider.User
dbname := defaultIdentifier(cr.Spec.ForProvider.Database)
table := defaultIdentifier(cr.Spec.ForProvider.Table)
binlog := cr.Spec.ForProvider.BinLog

observed := cr.Status.AtProvider.Privileges
desired := cr.Spec.ForProvider.Privileges.ToStringSlice()
Expand All @@ -287,11 +285,10 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext
sort.Strings(toRevoke)
privileges, grantOption := getPrivilegesString(toRevoke)
query := createRevokeQuery(privileges, dbname, username, table, grantOption)
if err := mysql.ExecWithBinlogAndFlush(ctx, c.db,
if err := mysql.ExecWithFlush(ctx, c.db,
mysql.ExecQuery{
Query: query, ErrorValue: errRevokeGrant,
}, mysql.ExecOptions{
Binlog: binlog}); err != nil {
}, mysql.ExecOptions{}); err != nil {
return managed.ExternalUpdate{}, err
}
}
Expand All @@ -300,11 +297,10 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext
sort.Strings(toGrant)
privileges, grantOption := getPrivilegesString(toGrant)
query := createGrantQuery(privileges, dbname, username, table, grantOption)
if err := mysql.ExecWithBinlogAndFlush(ctx, c.db,
if err := mysql.ExecWithFlush(ctx, c.db,
mysql.ExecQuery{
Query: query, ErrorValue: errCreateGrant,
}, mysql.ExecOptions{
Binlog: binlog}); err != nil {
}, mysql.ExecOptions{}); err != nil {
return managed.ExternalUpdate{}, err
}
}
Expand Down Expand Up @@ -369,12 +365,11 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {
username := *cr.Spec.ForProvider.User
dbname := defaultIdentifier(cr.Spec.ForProvider.Database)
table := defaultIdentifier(cr.Spec.ForProvider.Table)
binlog := cr.Spec.ForProvider.BinLog

privileges, grantOption := getPrivilegesString(cr.Spec.ForProvider.Privileges.ToStringSlice())
query := createRevokeQuery(privileges, dbname, username, table, grantOption)

if err := mysql.ExecWithBinlogAndFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errRevokeGrant}, mysql.ExecOptions{Binlog: binlog}); err != nil {
if err := mysql.ExecWithFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errRevokeGrant}, mysql.ExecOptions{}); err != nil {
var myErr *mysqldriver.MySQLError
if errors.As(err, &myErr) && myErr.Number == errCodeNoSuchGrant {
// MySQL automatically deletes related grants if the user has been deleted
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/mysql/grant/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestConnect(t *testing.T) {
type fields struct {
kube client.Client
usage resource.Tracker
newDB func(creds map[string][]byte, tls *string) xsql.DB
newDB func(creds map[string][]byte, tls *string, binlog *bool) xsql.DB
}

type args struct {
Expand Down
20 changes: 8 additions & 12 deletions pkg/controller/mysql/user/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func Setup(mgr ctrl.Manager, o xpcontroller.Options) error {
type connector struct {
kube client.Client
usage resource.Tracker
newDB func(creds map[string][]byte, tls *string) xsql.DB
newDB func(creds map[string][]byte, tls *string, binlog *bool) xsql.DB
}

func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) {
Expand Down Expand Up @@ -116,7 +116,7 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E
}

return &external{
db: c.newDB(s.Data, pc.Spec.TLS),
db: c.newDB(s.Data, pc.Spec.TLS, cr.Spec.ForProvider.BinLog),
kube: c.kube,
}, nil
}
Expand Down Expand Up @@ -251,8 +251,7 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
}

ro := resourceOptionsToClauses(cr.Spec.ForProvider.ResourceOptions)
binlog := cr.Spec.ForProvider.BinLog
if err := c.executeCreateUserQuery(ctx, username, host, ro, pw, binlog); err != nil {
if err := c.executeCreateUserQuery(ctx, username, host, ro, pw); err != nil {
return managed.ExternalCreation{}, err
}

Expand All @@ -265,7 +264,7 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
}, nil
}

func (c *external) executeCreateUserQuery(ctx context.Context, username string, host string, resourceOptionsClauses []string, pw string, binlog *bool) error {
func (c *external) executeCreateUserQuery(ctx context.Context, username string, host string, resourceOptionsClauses []string, pw string) error {
resourceOptions := ""
if len(resourceOptionsClauses) != 0 {
resourceOptions = fmt.Sprintf(" WITH %s", strings.Join(resourceOptionsClauses, " "))
Expand All @@ -279,7 +278,7 @@ func (c *external) executeCreateUserQuery(ctx context.Context, username string,
resourceOptions,
)

if err := mysql.ExecWithBinlogAndFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errCreateUser}, mysql.ExecOptions{Binlog: binlog}); err != nil {
if err := mysql.ExecWithFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errCreateUser}, mysql.ExecOptions{}); err != nil {
return err
}

Expand All @@ -303,14 +302,13 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext
if len(rochanged) > 0 {
resourceOptions := fmt.Sprintf("WITH %s", strings.Join(ro, " "))

binlog := cr.Spec.ForProvider.BinLog
query := fmt.Sprintf(
"ALTER USER %s@%s %s",
mysql.QuoteValue(username),
mysql.QuoteValue(host),
resourceOptions,
)
if err := mysql.ExecWithBinlogAndFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errUpdateUser}, mysql.ExecOptions{Binlog: binlog}); err != nil {
if err := mysql.ExecWithFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errUpdateUser}, mysql.ExecOptions{}); err != nil {
return managed.ExternalUpdate{}, err
}

Expand All @@ -336,9 +334,8 @@ func (c *external) UpdatePassword(ctx context.Context, cr *v1alpha1.User, userna
}

if pwchanged {
binlog := cr.Spec.ForProvider.BinLog
query := fmt.Sprintf("ALTER USER %s@%s IDENTIFIED BY %s", mysql.QuoteValue(username), mysql.QuoteValue(host), mysql.QuoteValue(pw))
if err := mysql.ExecWithBinlogAndFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errUpdateUser}, mysql.ExecOptions{Binlog: binlog}); err != nil {
if err := mysql.ExecWithFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errUpdateUser}, mysql.ExecOptions{}); err != nil {
return managed.ConnectionDetails{}, err
}

Expand All @@ -358,9 +355,8 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {

username, host := mysql.SplitUserHost(meta.GetExternalName(cr))

binlog := cr.Spec.ForProvider.BinLog
query := fmt.Sprintf("DROP USER IF EXISTS %s@%s", mysql.QuoteValue(username), mysql.QuoteValue(host))
if err := mysql.ExecWithBinlogAndFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errDropUser}, mysql.ExecOptions{Binlog: binlog}); err != nil {
if err := mysql.ExecWithFlush(ctx, c.db, mysql.ExecQuery{Query: query, ErrorValue: errDropUser}, mysql.ExecOptions{}); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/mysql/user/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestConnect(t *testing.T) {
type fields struct {
kube client.Client
usage resource.Tracker
newDB func(creds map[string][]byte, tls *string) xsql.DB
newDB func(creds map[string][]byte, tls *string, binlog *bool) xsql.DB
}

type args struct {
Expand Down