Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tx and retry #16

Merged
merged 24 commits into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4db500c
- add retry capabilities
rocketlaunchr-cto Mar 22, 2020
dc1a78d
- basic functionality complete
rocketlaunchr-cto Mar 22, 2020
e1618e0
- implement retry to Tx function
rocketlaunchr-cto Mar 23, 2020
c89676c
- add convenient functions for retry policy
rocketlaunchr-cto Mar 23, 2020
92e23b7
- update readme
rocketlaunchr-cto Mar 23, 2020
286a400
- update readme
rocketlaunchr-cto Mar 23, 2020
2d2c918
- update readme
rocketlaunchr-cto Mar 23, 2020
dff01bb
- update readme and docs
rocketlaunchr-cto Mar 23, 2020
741fab9
- run go mod tidy
rocketlaunchr-cto Mar 23, 2020
b5ee06c
- update godoc
rocketlaunchr-cto Mar 23, 2020
036fa13
- update docs
rocketlaunchr-cto Mar 23, 2020
218a3a6
- check if arguments provided equal placeholders
rocketlaunchr-cto Mar 23, 2020
ebbaaa7
- update readme
rocketlaunchr-cto Mar 23, 2020
17da1c7
update README
propersam Mar 25, 2020
cfd8b40
revert readme update
propersam Mar 25, 2020
2789b14
- keep trying to rollback tx if it failed.
rocketlaunchr-cto Mar 26, 2020
5b40eae
- use go mod compatible pkg path
rocketlaunchr-cto Mar 26, 2020
3f93cb8
- deprecate INSERT function.
rocketlaunchr-cto Mar 27, 2020
7d6fab9
- update readme
rocketlaunchr-cto Mar 29, 2020
da3eaec
- add INSERTStmt function
rocketlaunchr-cto Mar 29, 2020
273fea4
Revert "- deprecate INSERT function."
rocketlaunchr-cto Mar 29, 2020
79ce80b
- add retry to BulkUpdate
rocketlaunchr-cto Mar 29, 2020
189a147
- fix up compile error
rocketlaunchr-cto Mar 29, 2020
a42b6cc
fix issue
propersam Mar 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
- implement retry to Tx function
  • Loading branch information
rocketlaunchr-cto committed Mar 23, 2020
commit e1618e0c8df1df26ac874425f88f2fd34377d4e0
16 changes: 8 additions & 8 deletions v2/gen_dbq.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ type Options struct {
// MustE is a wrapper around the E function. It will panic upon encountering an error.
// This can erradicate boiler-plate error handing code.
func MustE(ctx context.Context, db ExecContexter, query string, options *Options, args ...interface{}) sql.Result {
EkXBAk, jQZLCt := E(ctx, db, query, options, args...)
if jQZLCt != nil {
panic(jQZLCt)
BmyArK, Ctzkjk := E(ctx, db, query, options, args...)
if Ctzkjk != nil {
panic(Ctzkjk)
}
return EkXBAk
return BmyArK
}

// E is a wrapper around the Q function. It is used for "Exec" queries such as insert, update and delete.
Expand All @@ -141,11 +141,11 @@ func E(ctx context.Context, db ExecContexter, query string, options *Options, ar
// MustQ is a wrapper around the Q function. It will panic upon encountering an error.
// This can erradicate boiler-plate error handing code.
func MustQ(ctx context.Context, db interface{}, query string, options *Options, args ...interface{}) interface{} {
TMtTCo, aNatyy := Q(ctx, db, query, options, args...)
if aNatyy != nil {
panic(aNatyy)
ZIvaBj, MkXVbW := Q(ctx, db, query, options, args...)
if MkXVbW != nil {
panic(MkXVbW)
}
return TMtTCo
return ZIvaBj
}

// Q is a convenience function that is used for inserting, updating, deleting, and querying a SQL database.
Expand Down
6 changes: 0 additions & 6 deletions v2/gen_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ type SQLBasic interface {
QueryContexter
}

// Txer represents a transaction.
type Txer interface {
Commit() error
Rollback() error
}

// BeginTxer is an object than can begin a transaction.
type BeginTxer interface {
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
Expand Down
64 changes: 56 additions & 8 deletions v2/gen_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,45 @@ import (
"fmt"

rlSql "github.com/rocketlaunchr/mysql-go"
"gopkg.in/cenkalti/backoff.v4"
// "github.com/cenkalti/backoff/v4"
)

type txer interface {
Commit() error
Rollback() error
}

// QFn is shorthand for Q. It will automatically use the appropriate transaction.
type QFn func(ctx context.Context, query string, options *Options, args ...interface{}) (interface{}, error)

// EFn is shorthand for E. It will automatically use the appropriate transaction.
type EFn func(ctx context.Context, query string, options *Options, args ...interface{}) (sql.Result, error)

// TxCommit will commit the transaction.
type TxCommit func() error

// Tx is used to perform an arbitrary operation and not have to worry about rolling back a transaction.
// The transaction is automatically rolled back unless it is explicitly committed.
// The transaction is automatically rolled back unless committed by calling txCommit.
// tx is only exposed for performance purposes. Do not use it to commit or rollback.
//
// NOTE: Until this note is removed, this function is not necessarily backward compatible.
func Tx(ctx context.Context, db interface{}, fn func(tx Txer, Q QFn, E EFn)) error {
//
// Example:
//
// ctx := context.Background()
// pool, _ := sql.Open("mysql", "user:password@tcp(localhost:3306)/db")
//
// dbq.Tx(ctx, pool, func(tx interface{}, Q dbq.QFn, E dbq.EFn, txCommit dbq.TxCommit) {
// stmt := dbq.INSERT("test", []string{"name", "age", "created_at"}, 1)
// res, err := E(ctx, stmt, nil, "test name", 34, time.Now())
// if err != nil {
// return // Automatic rollback
// }
// txCommit()
// })
//
func Tx(ctx context.Context, db interface{}, fn func(tx interface{}, Q QFn, E EFn, txCommit TxCommit), retryPolicy ...backoff.BackOff) error {

var (
alreadyTx bool
Expand Down Expand Up @@ -50,7 +76,7 @@ func Tx(ctx context.Context, db interface{}, fn func(tx Txer, Q QFn, E EFn)) err

defer func() {
if r := recover(); r != nil {
tx.(Txer).Rollback()
tx.(txer).Rollback()
panic(r)
}
}()
Expand All @@ -67,10 +93,32 @@ func Tx(ctx context.Context, db interface{}, fn func(tx Txer, Q QFn, E EFn)) err
return E(ctx, tx.(ExecContexter), query, options, args...)
}

fn(tx.(Txer), qFn, eFn)
err = tx.(Txer).Rollback()
if err == sql.ErrTxDone {
return nil
completed := false
txCommit := func() error {
err := tx.(txer).Commit()
if err == nil || err == sql.ErrTxDone {
completed = true
return nil
}
return err
}

operation := func() error {
fn(tx, qFn, eFn, txCommit)
if completed {
return nil
}
err = tx.(txer).Rollback()
if err == sql.ErrTxDone {
return nil
}
return err
}

if !(len(retryPolicy) > 0 && retryPolicy[0] != nil) {

return operation()
}
return err

return backoff.Retry(operation, backoff.WithContext(retryPolicy[0], ctx))
}
6 changes: 0 additions & 6 deletions v2/interfaces.igo
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ type SQLBasic interface {
QueryContexter
}

// Txer represents a transaction.
type Txer interface {
Commit() error
Rollback() error
}

// BeginTxer is an object than can begin a transaction.
type BeginTxer interface {
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
Expand Down
64 changes: 56 additions & 8 deletions v2/tx.igo
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,45 @@ import (
"fmt"

rlSql "github.com/rocketlaunchr/mysql-go"
"gopkg.in/cenkalti/backoff.v4"
// "github.com/cenkalti/backoff/v4"
)

type txer interface {
Commit() error
Rollback() error
}

// QFn is shorthand for Q. It will automatically use the appropriate transaction.
type QFn func(ctx context.Context, query string, options *Options, args ...interface{}) (interface{}, error)

// EFn is shorthand for E. It will automatically use the appropriate transaction.
type EFn func(ctx context.Context, query string, options *Options, args ...interface{}) (sql.Result, error)

// TxCommit will commit the transaction.
type TxCommit func() error

// Tx is used to perform an arbitrary operation and not have to worry about rolling back a transaction.
// The transaction is automatically rolled back unless it is explicitly committed.
// The transaction is automatically rolled back unless committed by calling txCommit.
// tx is only exposed for performance purposes. Do not use it to commit or rollback.
//
// NOTE: Until this note is removed, this function is not necessarily backward compatible.
func Tx(ctx context.Context, db interface{}, fn func(tx Txer, Q QFn, E EFn)) error {
//
// Example:
//
// ctx := context.Background()
// pool, _ := sql.Open("mysql", "user:password@tcp(localhost:3306)/db")
//
// dbq.Tx(ctx, pool, func(tx interface{}, Q dbq.QFn, E dbq.EFn, txCommit dbq.TxCommit) {
// stmt := dbq.INSERT("test", []string{"name", "age", "created_at"}, 1)
// res, err := E(ctx, stmt, nil, "test name", 34, time.Now())
// if err != nil {
// return // Automatic rollback
// }
// txCommit()
// })
//
func Tx(ctx context.Context, db interface{}, fn func(tx interface{}, Q QFn, E EFn, txCommit TxCommit), retryPolicy ...backoff.BackOff) error {

var (
alreadyTx bool
Expand Down Expand Up @@ -49,7 +75,7 @@ func Tx(ctx context.Context, db interface{}, fn func(tx Txer, Q QFn, E EFn)) err

defer func() {
if r := recover(); r != nil {
tx.(Txer).Rollback()
tx.(txer).Rollback()
panic(r)
}
}()
Expand All @@ -66,10 +92,32 @@ func Tx(ctx context.Context, db interface{}, fn func(tx Txer, Q QFn, E EFn)) err
return E(ctx, tx.(ExecContexter), query, options, args...)
}

fn(tx.(Txer), qFn, eFn)
err = tx.(Txer).Rollback()
if err == sql.ErrTxDone {
return nil
completed := false
txCommit := func() error {
err := tx.(txer).Commit()
if err == nil || err == sql.ErrTxDone {
completed = true
return nil
}
return err
}
return err

operation := func() error {
fn(tx, qFn, eFn, txCommit)
if completed {
return nil
}
err = tx.(txer).Rollback()
if err == sql.ErrTxDone {
return nil
}
return err
}

if !(len(retryPolicy) > 0 && retryPolicy[0] != nil) {
// No retry
return operation()
}

return backoff.Retry(operation, backoff.WithContext(retryPolicy[0], ctx))
}