Skip to content

Commit

Permalink
remove custom retrying stuff in favor of aws impl
Browse files Browse the repository at this point in the history
  • Loading branch information
guregu committed May 4, 2024
1 parent a749b0c commit ec1651c
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 199 deletions.
70 changes: 4 additions & 66 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,18 @@ import (
"context"
"errors"
"fmt"
"os"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/smithy-go"
"github.com/aws/smithy-go/logging"

"github.com/guregu/dynamo/v2/dynamodbiface"
)

// DB is a DynamoDB client.
type DB struct {
client dynamodbiface.DynamoDBAPI
logger logging.Logger
retryer func() aws.Retryer
retryMax int
client dynamodbiface.DynamoDBAPI
}

// New creates a new client with the given configuration.
Expand All @@ -29,45 +25,14 @@ type DB struct {
// (0 for no retrying, -1 for default behavior of unlimited retries).
func New(cfg aws.Config, options ...func(*dynamodb.Options)) *DB {
client := dynamodb.NewFromConfig(cfg, options...)
return newDB(client, cfg)
return NewFromIface(client)
}

// NewFromIface creates a new client with the given interface.
func NewFromIface(client dynamodbiface.DynamoDBAPI) *DB {
return newDB(client, aws.Config{})
}

func newDB(client dynamodbiface.DynamoDBAPI, cfg aws.Config) *DB {
db := &DB{
client: client,
logger: cfg.Logger,
retryMax: -1,
}

if db.logger == nil {
db.logger = logging.NewStandardLogger(os.Stdout)
client: client,
}

// TODO: replace all of this with AWS Retryer interface
/*
if real, ok := client.(*dynamodb.Client); ok {
if retryer := real.Options().Retryer; retryer != nil {
db.retryer = func() aws.Retryer { return retryer }
if cfg.Retryer != nil {
db.retryer = cfg.Retryer
}
} else if real.Options().RetryMaxAttempts > 0 {
db.retryMax = cfg.RetryMaxAttempts
}
} else {
*/
if cfg.Retryer != nil {
db.retryer = cfg.Retryer
} else if cfg.RetryMaxAttempts > 0 {
db.retryMax = cfg.RetryMaxAttempts
}
// }

return db
}

Expand All @@ -76,33 +41,6 @@ func (db *DB) Client() dynamodbiface.DynamoDBAPI {
return db.client
}

// TODO: should we expose these, or come up with a better interface?
// They could be useful in conjunction with NewFromIface, but SetRetryer would be misleading;
// dynamo expects it to be called from within the dynamodbapi interface.
// Probably best to create a forward-compatible (v2-friendly) configuration API instead.

// func (db *DB) SetRetryer(retryer request.Retryer) {
// db.retryer = retryer
// }

// func (db *DB) SetMaxRetries(max int) *DB {
// db.retryMax = max
// return db
// }

// func (db *DB) SetLogger(logger aws.Logger) *DB {
// if logger == nil {
// db.logger = noopLogger{}
// return db
// }
// db.logger = logger
// return db
// }

// func (db *DB) log(format string, v ...interface{}) {
// db.logger.Logf(logging.Debug, format, v...)
// }

// ListTables is a request to list tables.
// See: http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ListTables.html
type ListTables struct {
Expand Down
81 changes: 3 additions & 78 deletions retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,85 +2,10 @@ package dynamo

import (
"context"
"errors"
"time"

"github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/smithy-go"
awstime "github.com/aws/smithy-go/time"
"github.com/cenkalti/backoff/v4"
)

func (db *DB) retry(ctx context.Context, f func() error) error {
// if a custom retryer has been set, the SDK will retry for us
if db.retryer != nil {
return f()
}

var err error
var next time.Duration
b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
for i := 0; db.retryMax < 0 || i <= db.retryMax; i++ {
if err = f(); err == nil {
return nil
}

if !canRetry(err) {
return err
}

if next = b.NextBackOff(); next == backoff.Stop {
return err
}
if err := awstime.SleepWithContext(ctx, next); err != nil {
return err
}
}
return err
}

// errRetry is a sentinel error to retry, should never be returned to user
var errRetry = errors.New("dynamo: retry")

func canRetry(err error) bool {
if errors.Is(err, errRetry) {
return true
}

var txe *types.TransactionCanceledException
if errors.As(err, &txe) {
retry := false
for _, reason := range txe.CancellationReasons {
if reason.Code == nil {
continue
}
switch *reason.Code {
case "ValidationError", "ConditionalCheckFailed", "ItemCollectionSizeLimitExceeded":
return false
case "ThrottlingError", "ProvisionedThroughputExceeded", "TransactionConflict":
retry = true
}
}
return retry
}

var aerr smithy.APIError
if errors.As(err, &aerr) {
switch aerr.ErrorCode() {
case "ProvisionedThroughputExceededException",
"ThrottlingException":
return true
}
}

var rerr *http.ResponseError
if errors.As(err, &rerr) {
switch rerr.HTTPStatusCode() {
case 500, 503:
return true
}
}
// TODO: delete this

return false
func (db *DB) retry(_ context.Context, f func() error) error {
return f()
}
34 changes: 0 additions & 34 deletions retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,13 @@ package dynamo

import (
"context"
"fmt"
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

func TestRetryMax(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

test := func(max int) (string, func(t *testing.T)) {
name := fmt.Sprintf("max(%d)", max)
return name, func(t *testing.T) {
t.Parallel()
t.Helper()
db := New(aws.Config{
RetryMaxAttempts: max,
Credentials: dummyCreds,
})

var runs int
err := db.retry(context.Background(), func() error {
runs++
return &types.ProvisionedThroughputExceededException{}
})
if err == nil {
t.Fatal("expected error, got nil")
}
if want := max + 1; runs != want {
t.Error("wrong number of runs. want:", want, "got:", runs)
}
}
}
// t.Run(test(0)) // behavior changed from v1
t.Run(test(1))
t.Run(test(3))
}

func TestRetryCustom(t *testing.T) {
t.Parallel()
retryer := func() aws.Retryer {
Expand Down
46 changes: 25 additions & 21 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package dynamo

import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/smithy-go"
)

// Status is an enumeration of table and index statuses.
Expand Down Expand Up @@ -65,29 +64,34 @@ func (table Table) Wait(ctx context.Context, want ...Status) error {
}
}

err := table.db.retry(ctx, func() error {
desc, err := table.Describe().Run(ctx)
var aerr smithy.APIError
if errors.As(err, &aerr) {
if aerr.ErrorCode() == "ResourceNotFoundException" {
if wantGone {
return nil
}
return errRetry
}
}
if err != nil {
return err
}
// I don't know why AWS wants a context _and_ a duration param.
// Infer it from context; if it's indefinite then set it to something really high (1 day)
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(24 * time.Hour)
}
maxDur := time.Until(deadline)

for _, status := range want {
if status == desc.Status {
return nil
if wantGone {
waiter := dynamodb.NewTableNotExistsWaiter(table.db.client)
return waiter.Wait(ctx, table.Describe().input(), maxDur)
}

waiter := dynamodb.NewTableExistsWaiter(table.db.client, func(opts *dynamodb.TableExistsWaiterOptions) {
fallback := opts.Retryable
opts.Retryable = func(ctx context.Context, in *dynamodb.DescribeTableInput, out *dynamodb.DescribeTableOutput, err error) (bool, error) {
if err == nil && out != nil && out.Table != nil {
status := string(out.Table.TableStatus)
for _, wantStatus := range want {
if status == string(wantStatus) {
return false, nil
}
}
}
return fallback(ctx, in, out, err)
}
return errRetry
})
return err
return waiter.Wait(ctx, table.Describe().input(), maxDur)
}

// primaryKeys attempts to determine this table's primary keys.
Expand Down

0 comments on commit ec1651c

Please sign in to comment.