Skip to content

Commit

Permalink
Automatically retry when encounter connection reset by peer error fro…
Browse files Browse the repository at this point in the history
…m aws api
  • Loading branch information
Jiashu Chen committed Jul 25, 2021
1 parent 0a76c88 commit 03ea02c
Show file tree
Hide file tree
Showing 284 changed files with 711 additions and 484 deletions.
3 changes: 3 additions & 0 deletions .changelog/10715.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
internal/tfresource/retry: Add retry handling when a request's connection is reset by peer
```
14 changes: 7 additions & 7 deletions .semgrep.yml
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ rules:

- id: helper-schema-resource-Retry-without-TimeoutError-check
languages: [go]
message: Check resource.Retry() errors with tfresource.TimedOut()
message: Check tfresource.RetryOnConnectionResetByPeer() errors with tfresource.TimedOut()
paths:
exclude:
- "*_test.go"
Expand All @@ -371,33 +371,33 @@ rules:
- patterns:
- pattern-either:
- pattern: |
$ERR := resource.Retry(...)
$ERR := tfresource.RetryOnConnectionResetByPeer(...)
...
return ...
- pattern: |
$ERR = resource.Retry(...)
$ERR = tfresource.RetryOnConnectionResetByPeer(...)
...
return ...
- pattern-not: |
$ERR := resource.Retry(...)
$ERR := tfresource.RetryOnConnectionResetByPeer(...)
...
if isResourceTimeoutError($ERR) { ... }
...
return ...
- pattern-not: |
$ERR = resource.Retry(...)
$ERR = tfresource.RetryOnConnectionResetByPeer(...)
...
if isResourceTimeoutError($ERR) { ... }
...
return ...
- pattern-not: |
$ERR := resource.Retry(...)
$ERR := tfresource.RetryOnConnectionResetByPeer(...)
...
if tfresource.TimedOut($ERR) { ... }
...
return ...
- pattern-not: |
$ERR = resource.Retry(...)
$ERR = tfresource.RetryOnConnectionResetByPeer(...)
...
if tfresource.TimedOut($ERR) { ... }
...
Expand Down
4 changes: 2 additions & 2 deletions aws/awserr.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func isAWSErrRequestFailureStatusCode(err error, statusCode int) bool {

func retryOnAwsCode(code string, f func() (interface{}, error)) (interface{}, error) {
var resp interface{}
err := resource.Retry(2*time.Minute, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(2*time.Minute, func() *resource.RetryError {
var err error
resp, err = f()
if err != nil {
Expand All @@ -52,7 +52,7 @@ func retryOnAwsCode(code string, f func() (interface{}, error)) (interface{}, er
// Note: This function will be moved out of the aws package in the future.
func RetryOnAwsCodes(codes []string, f func() (interface{}, error)) (interface{}, error) {
var resp interface{}
err := resource.Retry(1*time.Minute, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(1*time.Minute, func() *resource.RetryError {
var err error
resp, err = f()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions aws/data_source_aws_iam_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func dataSourceAwsIAMPolicyRead(d *schema.ResourceData, meta interface{}) error
var results []*iam.Policy

// Handle IAM eventual consistency
err := resource.Retry(waiter.PropagationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(waiter.PropagationTimeout, func() *resource.RetryError {
var err error
results, err = finder.Policies(conn, arn, name, pathPrefix)

Expand Down Expand Up @@ -138,7 +138,7 @@ func dataSourceAwsIAMPolicyRead(d *schema.ResourceData, meta interface{}) error

// Handle IAM eventual consistency
var policyVersionOutput *iam.GetPolicyVersionOutput
err = resource.Retry(waiter.PropagationTimeout, func() *resource.RetryError {
err = tfresource.RetryOnConnectionResetByPeer(waiter.PropagationTimeout, func() *resource.RetryError {
var err error
policyVersionOutput, err = conn.GetPolicyVersion(policyVersionInput)

Expand Down
2 changes: 1 addition & 1 deletion aws/data_source_aws_iam_session_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func dataSourceAwsIAMSessionContextRead(d *schema.ResourceData, meta interface{}

var role *iam.Role

err = resource.Retry(waiter.PropagationTimeout, func() *resource.RetryError {
err = tfresource.RetryOnConnectionResetByPeer(waiter.PropagationTimeout, func() *resource.RetryError {
var err error

role, err = finder.Role(conn, roleName)
Expand Down
2 changes: 1 addition & 1 deletion aws/internal/keyvaluetags/create_tags_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aws/internal/keyvaluetags/generators/createtags/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func {{ . | Title }}CreateTags(conn {{ . | ClientType }}, identifier string{{ if
{{- if . | RetryCreationOnResourceNotFound }}
err := resource.Retry(EventualConsistencyTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(EventualConsistencyTimeout, func() *resource.RetryError {
_, err := conn.{{ . | TagFunction }}(input)
{{- if . | ResourceNotFoundErrorCodeContains }}
Expand Down
2 changes: 1 addition & 1 deletion aws/internal/keyvaluetags/s3_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func S3ObjectListTags(conn *s3.S3, bucket, key string) (KeyValueTags, error) {

var output *s3.GetObjectTaggingOutput

err := resource.Retry(1*time.Minute, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(1*time.Minute, func() *resource.RetryError {
var err error
output, err = conn.GetObjectTagging(input)
if awsErr, ok := err.(awserr.Error); ok {
Expand Down
2 changes: 1 addition & 1 deletion aws/internal/service/kinesisanalytics/waiter/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func ApplicationUpdated(conn *kinesisanalytics.KinesisAnalytics, name string) (*
func IAMPropagation(f func() (interface{}, error)) (interface{}, error) {
var output interface{}

err := resource.Retry(iamwaiter.PropagationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(iamwaiter.PropagationTimeout, func() *resource.RetryError {
var err error

output, err = f()
Expand Down
2 changes: 1 addition & 1 deletion aws/internal/service/kinesisanalyticsv2/waiter/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func ApplicationUpdated(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string
func IAMPropagation(f func() (interface{}, error)) (interface{}, error) {
var output interface{}

err := resource.Retry(iamwaiter.PropagationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(iamwaiter.PropagationTimeout, func() *resource.RetryError {
var err error

output, err = f()
Expand Down
6 changes: 3 additions & 3 deletions aws/internal/service/ram/finder/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func ResourceShareInvitationByResourceShareArnAndStatus(conn *ram.RAM, resourceS
var invitation *ram.ResourceShareInvitation

// Retry for Ram resource share invitation eventual consistency
err := resource.Retry(FindInvitationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(FindInvitationTimeout, func() *resource.RetryError {
i, err := resourceShareInvitationByResourceShareArnAndStatus(conn, resourceShareArn, status)
invitation = i

Expand Down Expand Up @@ -79,7 +79,7 @@ func ResourceShareInvitationByArn(conn *ram.RAM, arn string) (*ram.ResourceShare
var invitation *ram.ResourceShareInvitation

// Retry for Ram resource share invitation eventual consistency
err := resource.Retry(FindInvitationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(FindInvitationTimeout, func() *resource.RetryError {
i, err := resourceShareInvitationByArn(conn, arn)
invitation = i

Expand Down Expand Up @@ -113,7 +113,7 @@ func resourceShare(conn *ram.RAM, input *ram.GetResourceSharesInput) (*ram.Resou
var shares *ram.GetResourceSharesOutput

// Retry for Ram resource share eventual consistency
err := resource.Retry(FindResourceShareTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(FindResourceShareTimeout, func() *resource.RetryError {
ss, err := conn.GetResourceShares(input)
shares = ss

Expand Down
2 changes: 1 addition & 1 deletion aws/internal/service/sqs/waiter/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func QueueAttributesPropagated(conn *sqs.SQS, url string, expected map[string]st
}

var got map[string]string
err := resource.Retry(QueueAttributePropagationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(QueueAttributePropagationTimeout, func() *resource.RetryError {
var err error

got, err = finder.QueueAttributesByURL(conn, url)
Expand Down
15 changes: 14 additions & 1 deletion aws/internal/tfresource/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/request"
"github.com/hashicorp/aws-sdk-go-base/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)
Expand All @@ -14,7 +15,7 @@ import (
func RetryWhenAwsErrCodeEquals(timeout time.Duration, f func() (interface{}, error), codes ...string) (interface{}, error) {
var output interface{}

err := resource.Retry(timeout, func() *resource.RetryError {
err := RetryOnConnectionResetByPeer(timeout, func() *resource.RetryError {
var err error

output, err = f()
Expand Down Expand Up @@ -114,3 +115,15 @@ func RetryConfigContext(ctx context.Context, delay time.Duration, delayRand time
// more likely to be useful
return resultErr
}

func RetryOnConnectionResetByPeer(timeout time.Duration, f resource.RetryFunc) error {
return resource.RetryContext(context.Background(), timeout, func() *resource.RetryError {
err := f()

if err != nil && !err.Retryable && tfawserr.ErrMessageContains(err.Err, request.ErrCodeRequestError, "read: connection reset by peer") {
return resource.RetryableError(err.Err)
}

return err
})
}
53 changes: 53 additions & 0 deletions aws/internal/tfresource/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)
Expand Down Expand Up @@ -97,3 +98,55 @@ func TestRetryConfigContext_error(t *testing.T) {
t.Fatal("timeout")
}
}

func TestRetryOnConnectionResetByPeer(t *testing.T) {
var retryCount int32

testCases := []struct {
Name string
F func() *resource.RetryError
ExpectError bool
}{
{
Name: "retryable error",
F: func() *resource.RetryError {
if atomic.CompareAndSwapInt32(&retryCount, 0, 1) {
return resource.RetryableError(awserr.New(request.ErrCodeRequestError, "RequestError other", nil))
}
return nil
},
ExpectError: false,
},
{
Name: "non-retryable RequestError read: connection reset by peer should still be retried",
F: func() *resource.RetryError {
if atomic.CompareAndSwapInt32(&retryCount, 0, 1) {
return resource.NonRetryableError(awserr.New(request.ErrCodeRequestError, "RequestError read: connection reset by peer", nil))
}
return nil
},
ExpectError: false,
},
{
Name: "non-retryable other request error",
F: func() *resource.RetryError {
return resource.NonRetryableError(awserr.New(request.ErrCodeRequestError, "RequestError other", nil))
},
ExpectError: true,
},
}

for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
retryCount = 0

err := tfresource.RetryOnConnectionResetByPeer(5*time.Second, testCase.F)

if testCase.ExpectError && err == nil {
t.Fatal("expected error")
} else if !testCase.ExpectError && err != nil {
t.Fatalf("unexpected error: %s", err)
}
})
}
}
3 changes: 2 additions & 1 deletion aws/resource_aws_accessanalyzer_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/keyvaluetags"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

const (
Expand Down Expand Up @@ -79,7 +80,7 @@ func resourceAwsAccessAnalyzerAnalyzerCreate(d *schema.ResourceData, meta interf
}

// Handle Organizations eventual consistency
err := resource.Retry(accessAnalyzerOrganizationCreationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(accessAnalyzerOrganizationCreationTimeout, func() *resource.RetryError {
_, err := conn.CreateAnalyzer(input)

if isAWSErr(err, accessanalyzer.ErrCodeValidationException, "You must create an organization") {
Expand Down
4 changes: 2 additions & 2 deletions aws/resource_aws_acm_certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func resourceAwsAcmCertificateRead(d *schema.ResourceData, meta interface{}) err
CertificateArn: aws.String(d.Id()),
}

return resource.Retry(AcmCertificateDnsValidationAssignmentTimeout, func() *resource.RetryError {
return tfresource.RetryOnConnectionResetByPeer(AcmCertificateDnsValidationAssignmentTimeout, func() *resource.RetryError {
resp, err := acmconn.DescribeCertificate(params)

if !d.IsNewResource() && tfawserr.ErrCodeEquals(err, acm.ErrCodeResourceNotFoundException) {
Expand Down Expand Up @@ -492,7 +492,7 @@ func resourceAwsAcmCertificateDelete(d *schema.ResourceData, meta interface{}) e
CertificateArn: aws.String(d.Id()),
}

err := resource.Retry(AcmCertificateCrossServicePropagationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(AcmCertificateCrossServicePropagationTimeout, func() *resource.RetryError {
_, err := acmconn.DeleteCertificate(params)

if tfawserr.ErrCodeEquals(err, acm.ErrCodeResourceInUseException) {
Expand Down
5 changes: 3 additions & 2 deletions aws/resource_aws_acm_certificate_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

func resourceAwsAcmCertificateValidation() *schema.Resource {
Expand Down Expand Up @@ -71,7 +72,7 @@ func resourceAwsAcmCertificateValidationCreate(d *schema.ResourceData, meta inte
log.Printf("[INFO] No validation_record_fqdns set, skipping check")
}

err = resource.Retry(d.Timeout(schema.TimeoutCreate), func() *resource.RetryError {
err = tfresource.RetryOnConnectionResetByPeer(d.Timeout(schema.TimeoutCreate), func() *resource.RetryError {
resp, err := acmconn.DescribeCertificate(params)

if err != nil {
Expand Down Expand Up @@ -109,7 +110,7 @@ func resourceAwsAcmCertificateCheckValidationRecords(validationRecordFqdns []int
}
var err error
var output *acm.DescribeCertificateOutput
err = resource.Retry(1*time.Minute, func() *resource.RetryError {
err = tfresource.RetryOnConnectionResetByPeer(1*time.Minute, func() *resource.RetryError {
log.Printf("[DEBUG] Certificate domain validation options empty for %s, retrying", aws.StringValue(cert.CertificateArn))
output, err = conn.DescribeCertificate(input)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion aws/resource_aws_acmpca_certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/acmpca/waiter"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

func resourceAwsAcmpcaCertificate() *schema.Resource {
Expand Down Expand Up @@ -124,7 +125,7 @@ func resourceAwsAcmpcaCertificateCreate(d *schema.ResourceData, meta interface{}
}

var output *acmpca.IssueCertificateOutput
err = resource.Retry(waiter.CertificateAuthorityActiveTimeout, func() *resource.RetryError {
err = tfresource.RetryOnConnectionResetByPeer(waiter.CertificateAuthorityActiveTimeout, func() *resource.RetryError {
var err error
output, err = conn.IssueCertificate(input)
if tfawserr.ErrMessageContains(err, acmpca.ErrCodeInvalidStateException, "The certificate authority is not in a valid state for issuing certificates") {
Expand Down
3 changes: 2 additions & 1 deletion aws/resource_aws_acmpca_certificate_authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/terraform-providers/terraform-provider-aws/aws/internal/keyvaluetags"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/acmpca/finder"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/acmpca/waiter"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

func resourceAwsAcmpcaCertificateAuthority() *schema.Resource {
Expand Down Expand Up @@ -298,7 +299,7 @@ func resourceAwsAcmpcaCertificateAuthorityCreate(d *schema.ResourceData, meta in

log.Printf("[DEBUG] Creating ACM PCA Certificate Authority: %s", input)
var output *acmpca.CreateCertificateAuthorityOutput
err := resource.Retry(1*time.Minute, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(1*time.Minute, func() *resource.RetryError {
var err error
output, err = conn.CreateCertificateAuthority(input)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion aws/resource_aws_ami.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/hashcode"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/keyvaluetags"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

const (
Expand Down Expand Up @@ -338,7 +339,7 @@ func resourceAwsAmiRead(d *schema.ResourceData, meta interface{}) error {
}

var res *ec2.DescribeImagesOutput
err := resource.Retry(1*time.Minute, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(1*time.Minute, func() *resource.RetryError {
var err error
res, err = client.DescribeImages(req)
if err != nil {
Expand Down
Loading

0 comments on commit 03ea02c

Please sign in to comment.