From ec1651c49bd82ebbde825d242b42e71f3a05d722 Mon Sep 17 00:00:00 2001 From: Greg Date: Sun, 5 May 2024 05:05:11 +0900 Subject: [PATCH] remove custom retrying stuff in favor of aws impl --- db.go | 70 +++----------------------------------------- retry.go | 81 ++------------------------------------------------- retry_test.go | 34 --------------------- table.go | 46 ++++++++++++++++------------- 4 files changed, 32 insertions(+), 199 deletions(-) diff --git a/db.go b/db.go index 22dc5a6..b9fa812 100644 --- a/db.go +++ b/db.go @@ -5,22 +5,18 @@ import ( "context" "errors" "fmt" - "os" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/aws/smithy-go" - "github.com/aws/smithy-go/logging" + "github.com/guregu/dynamo/v2/dynamodbiface" ) // DB is a DynamoDB client. type DB struct { - client dynamodbiface.DynamoDBAPI - logger logging.Logger - retryer func() aws.Retryer - retryMax int + client dynamodbiface.DynamoDBAPI } // New creates a new client with the given configuration. @@ -29,45 +25,14 @@ type DB struct { // (0 for no retrying, -1 for default behavior of unlimited retries). func New(cfg aws.Config, options ...func(*dynamodb.Options)) *DB { client := dynamodb.NewFromConfig(cfg, options...) - return newDB(client, cfg) + return NewFromIface(client) } // NewFromIface creates a new client with the given interface. func NewFromIface(client dynamodbiface.DynamoDBAPI) *DB { - return newDB(client, aws.Config{}) -} - -func newDB(client dynamodbiface.DynamoDBAPI, cfg aws.Config) *DB { db := &DB{ - client: client, - logger: cfg.Logger, - retryMax: -1, - } - - if db.logger == nil { - db.logger = logging.NewStandardLogger(os.Stdout) + client: client, } - - // TODO: replace all of this with AWS Retryer interface - /* - if real, ok := client.(*dynamodb.Client); ok { - if retryer := real.Options().Retryer; retryer != nil { - db.retryer = func() aws.Retryer { return retryer } - if cfg.Retryer != nil { - db.retryer = cfg.Retryer - } - } else if real.Options().RetryMaxAttempts > 0 { - db.retryMax = cfg.RetryMaxAttempts - } - } else { - */ - if cfg.Retryer != nil { - db.retryer = cfg.Retryer - } else if cfg.RetryMaxAttempts > 0 { - db.retryMax = cfg.RetryMaxAttempts - } - // } - return db } @@ -76,33 +41,6 @@ func (db *DB) Client() dynamodbiface.DynamoDBAPI { return db.client } -// TODO: should we expose these, or come up with a better interface? -// They could be useful in conjunction with NewFromIface, but SetRetryer would be misleading; -// dynamo expects it to be called from within the dynamodbapi interface. -// Probably best to create a forward-compatible (v2-friendly) configuration API instead. - -// func (db *DB) SetRetryer(retryer request.Retryer) { -// db.retryer = retryer -// } - -// func (db *DB) SetMaxRetries(max int) *DB { -// db.retryMax = max -// return db -// } - -// func (db *DB) SetLogger(logger aws.Logger) *DB { -// if logger == nil { -// db.logger = noopLogger{} -// return db -// } -// db.logger = logger -// return db -// } - -// func (db *DB) log(format string, v ...interface{}) { -// db.logger.Logf(logging.Debug, format, v...) -// } - // ListTables is a request to list tables. // See: http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ListTables.html type ListTables struct { diff --git a/retry.go b/retry.go index 43b8d6e..f4f0aa1 100644 --- a/retry.go +++ b/retry.go @@ -2,85 +2,10 @@ package dynamo import ( "context" - "errors" - "time" - - "github.com/aws/aws-sdk-go-v2/aws/transport/http" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/aws/smithy-go" - awstime "github.com/aws/smithy-go/time" - "github.com/cenkalti/backoff/v4" ) -func (db *DB) retry(ctx context.Context, f func() error) error { - // if a custom retryer has been set, the SDK will retry for us - if db.retryer != nil { - return f() - } - - var err error - var next time.Duration - b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) - for i := 0; db.retryMax < 0 || i <= db.retryMax; i++ { - if err = f(); err == nil { - return nil - } - - if !canRetry(err) { - return err - } - - if next = b.NextBackOff(); next == backoff.Stop { - return err - } - if err := awstime.SleepWithContext(ctx, next); err != nil { - return err - } - } - return err -} - -// errRetry is a sentinel error to retry, should never be returned to user -var errRetry = errors.New("dynamo: retry") - -func canRetry(err error) bool { - if errors.Is(err, errRetry) { - return true - } - - var txe *types.TransactionCanceledException - if errors.As(err, &txe) { - retry := false - for _, reason := range txe.CancellationReasons { - if reason.Code == nil { - continue - } - switch *reason.Code { - case "ValidationError", "ConditionalCheckFailed", "ItemCollectionSizeLimitExceeded": - return false - case "ThrottlingError", "ProvisionedThroughputExceeded", "TransactionConflict": - retry = true - } - } - return retry - } - - var aerr smithy.APIError - if errors.As(err, &aerr) { - switch aerr.ErrorCode() { - case "ProvisionedThroughputExceededException", - "ThrottlingException": - return true - } - } - - var rerr *http.ResponseError - if errors.As(err, &rerr) { - switch rerr.HTTPStatusCode() { - case 500, 503: - return true - } - } +// TODO: delete this - return false +func (db *DB) retry(_ context.Context, f func() error) error { + return f() } diff --git a/retry_test.go b/retry_test.go index 6dd6880..48e90f9 100644 --- a/retry_test.go +++ b/retry_test.go @@ -2,7 +2,6 @@ package dynamo import ( "context" - "fmt" "testing" "github.com/aws/aws-sdk-go-v2/aws" @@ -10,39 +9,6 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ) -func TestRetryMax(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - - test := func(max int) (string, func(t *testing.T)) { - name := fmt.Sprintf("max(%d)", max) - return name, func(t *testing.T) { - t.Parallel() - t.Helper() - db := New(aws.Config{ - RetryMaxAttempts: max, - Credentials: dummyCreds, - }) - - var runs int - err := db.retry(context.Background(), func() error { - runs++ - return &types.ProvisionedThroughputExceededException{} - }) - if err == nil { - t.Fatal("expected error, got nil") - } - if want := max + 1; runs != want { - t.Error("wrong number of runs. want:", want, "got:", runs) - } - } - } - // t.Run(test(0)) // behavior changed from v1 - t.Run(test(1)) - t.Run(test(3)) -} - func TestRetryCustom(t *testing.T) { t.Parallel() retryer := func() aws.Retryer { diff --git a/table.go b/table.go index f3235f5..5faffdb 100644 --- a/table.go +++ b/table.go @@ -2,13 +2,12 @@ package dynamo import ( "context" - "errors" "fmt" "sync/atomic" + "time" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/aws/smithy-go" ) // Status is an enumeration of table and index statuses. @@ -65,29 +64,34 @@ func (table Table) Wait(ctx context.Context, want ...Status) error { } } - err := table.db.retry(ctx, func() error { - desc, err := table.Describe().Run(ctx) - var aerr smithy.APIError - if errors.As(err, &aerr) { - if aerr.ErrorCode() == "ResourceNotFoundException" { - if wantGone { - return nil - } - return errRetry - } - } - if err != nil { - return err - } + // I don't know why AWS wants a context _and_ a duration param. + // Infer it from context; if it's indefinite then set it to something really high (1 day) + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(24 * time.Hour) + } + maxDur := time.Until(deadline) - for _, status := range want { - if status == desc.Status { - return nil + if wantGone { + waiter := dynamodb.NewTableNotExistsWaiter(table.db.client) + return waiter.Wait(ctx, table.Describe().input(), maxDur) + } + + waiter := dynamodb.NewTableExistsWaiter(table.db.client, func(opts *dynamodb.TableExistsWaiterOptions) { + fallback := opts.Retryable + opts.Retryable = func(ctx context.Context, in *dynamodb.DescribeTableInput, out *dynamodb.DescribeTableOutput, err error) (bool, error) { + if err == nil && out != nil && out.Table != nil { + status := string(out.Table.TableStatus) + for _, wantStatus := range want { + if status == string(wantStatus) { + return false, nil + } + } } + return fallback(ctx, in, out, err) } - return errRetry }) - return err + return waiter.Wait(ctx, table.Describe().input(), maxDur) } // primaryKeys attempts to determine this table's primary keys.