Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Automatically retry when encounter connection reset by peer error from aws api #20300

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/10715.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
internal/tfresource/retry: Add retry handling when a request's connection is reset by peer
```
14 changes: 7 additions & 7 deletions .semgrep.yml
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ rules:

- id: helper-schema-resource-Retry-without-TimeoutError-check
languages: [go]
message: Check resource.Retry() errors with tfresource.TimedOut()
message: Check tfresource.RetryOnConnectionResetByPeer() errors with tfresource.TimedOut()
paths:
exclude:
- "*_test.go"
Expand All @@ -371,33 +371,33 @@ rules:
- patterns:
- pattern-either:
- pattern: |
$ERR := resource.Retry(...)
$ERR := tfresource.RetryOnConnectionResetByPeer(...)
...
return ...
- pattern: |
$ERR = resource.Retry(...)
$ERR = tfresource.RetryOnConnectionResetByPeer(...)
...
return ...
- pattern-not: |
$ERR := resource.Retry(...)
$ERR := tfresource.RetryOnConnectionResetByPeer(...)
...
if isResourceTimeoutError($ERR) { ... }
...
return ...
- pattern-not: |
$ERR = resource.Retry(...)
$ERR = tfresource.RetryOnConnectionResetByPeer(...)
...
if isResourceTimeoutError($ERR) { ... }
...
return ...
- pattern-not: |
$ERR := resource.Retry(...)
$ERR := tfresource.RetryOnConnectionResetByPeer(...)
...
if tfresource.TimedOut($ERR) { ... }
...
return ...
- pattern-not: |
$ERR = resource.Retry(...)
$ERR = tfresource.RetryOnConnectionResetByPeer(...)
...
if tfresource.TimedOut($ERR) { ... }
...
Expand Down
4 changes: 2 additions & 2 deletions aws/awserr.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func isAWSErrRequestFailureStatusCode(err error, statusCode int) bool {

func retryOnAwsCode(code string, f func() (interface{}, error)) (interface{}, error) {
var resp interface{}
err := resource.Retry(2*time.Minute, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(2*time.Minute, func() *resource.RetryError {
var err error
resp, err = f()
if err != nil {
Expand All @@ -52,7 +52,7 @@ func retryOnAwsCode(code string, f func() (interface{}, error)) (interface{}, er
// Note: This function will be moved out of the aws package in the future.
func RetryOnAwsCodes(codes []string, f func() (interface{}, error)) (interface{}, error) {
var resp interface{}
err := resource.Retry(1*time.Minute, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(1*time.Minute, func() *resource.RetryError {
var err error
resp, err = f()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions aws/data_source_aws_iam_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func dataSourceAwsIAMPolicyRead(d *schema.ResourceData, meta interface{}) error
var results []*iam.Policy

// Handle IAM eventual consistency
err := resource.Retry(waiter.PropagationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(waiter.PropagationTimeout, func() *resource.RetryError {
var err error
results, err = finder.Policies(conn, arn, name, pathPrefix)

Expand Down Expand Up @@ -138,7 +138,7 @@ func dataSourceAwsIAMPolicyRead(d *schema.ResourceData, meta interface{}) error

// Handle IAM eventual consistency
var policyVersionOutput *iam.GetPolicyVersionOutput
err = resource.Retry(waiter.PropagationTimeout, func() *resource.RetryError {
err = tfresource.RetryOnConnectionResetByPeer(waiter.PropagationTimeout, func() *resource.RetryError {
var err error
policyVersionOutput, err = conn.GetPolicyVersion(policyVersionInput)

Expand Down
2 changes: 1 addition & 1 deletion aws/data_source_aws_iam_session_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func dataSourceAwsIAMSessionContextRead(d *schema.ResourceData, meta interface{}

var role *iam.Role

err = resource.Retry(waiter.PropagationTimeout, func() *resource.RetryError {
err = tfresource.RetryOnConnectionResetByPeer(waiter.PropagationTimeout, func() *resource.RetryError {
var err error

role, err = finder.Role(conn, roleName)
Expand Down
2 changes: 1 addition & 1 deletion aws/internal/keyvaluetags/create_tags_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aws/internal/keyvaluetags/generators/createtags/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func {{ . | Title }}CreateTags(conn {{ . | ClientType }}, identifier string{{ if

{{- if . | RetryCreationOnResourceNotFound }}

err := resource.Retry(EventualConsistencyTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(EventualConsistencyTimeout, func() *resource.RetryError {
_, err := conn.{{ . | TagFunction }}(input)

{{- if . | ResourceNotFoundErrorCodeContains }}
Expand Down
2 changes: 1 addition & 1 deletion aws/internal/keyvaluetags/s3_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func S3ObjectListTags(conn *s3.S3, bucket, key string) (KeyValueTags, error) {

var output *s3.GetObjectTaggingOutput

err := resource.Retry(1*time.Minute, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(1*time.Minute, func() *resource.RetryError {
var err error
output, err = conn.GetObjectTagging(input)
if awsErr, ok := err.(awserr.Error); ok {
Expand Down
2 changes: 1 addition & 1 deletion aws/internal/service/kinesisanalytics/waiter/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func ApplicationUpdated(conn *kinesisanalytics.KinesisAnalytics, name string) (*
func IAMPropagation(f func() (interface{}, error)) (interface{}, error) {
var output interface{}

err := resource.Retry(iamwaiter.PropagationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(iamwaiter.PropagationTimeout, func() *resource.RetryError {
var err error

output, err = f()
Expand Down
2 changes: 1 addition & 1 deletion aws/internal/service/kinesisanalyticsv2/waiter/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func ApplicationUpdated(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string
func IAMPropagation(f func() (interface{}, error)) (interface{}, error) {
var output interface{}

err := resource.Retry(iamwaiter.PropagationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(iamwaiter.PropagationTimeout, func() *resource.RetryError {
var err error

output, err = f()
Expand Down
6 changes: 3 additions & 3 deletions aws/internal/service/ram/finder/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func ResourceShareInvitationByResourceShareArnAndStatus(conn *ram.RAM, resourceS
var invitation *ram.ResourceShareInvitation

// Retry for Ram resource share invitation eventual consistency
err := resource.Retry(FindInvitationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(FindInvitationTimeout, func() *resource.RetryError {
i, err := resourceShareInvitationByResourceShareArnAndStatus(conn, resourceShareArn, status)
invitation = i

Expand Down Expand Up @@ -79,7 +79,7 @@ func ResourceShareInvitationByArn(conn *ram.RAM, arn string) (*ram.ResourceShare
var invitation *ram.ResourceShareInvitation

// Retry for Ram resource share invitation eventual consistency
err := resource.Retry(FindInvitationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(FindInvitationTimeout, func() *resource.RetryError {
i, err := resourceShareInvitationByArn(conn, arn)
invitation = i

Expand Down Expand Up @@ -113,7 +113,7 @@ func resourceShare(conn *ram.RAM, input *ram.GetResourceSharesInput) (*ram.Resou
var shares *ram.GetResourceSharesOutput

// Retry for Ram resource share eventual consistency
err := resource.Retry(FindResourceShareTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(FindResourceShareTimeout, func() *resource.RetryError {
ss, err := conn.GetResourceShares(input)
shares = ss

Expand Down
2 changes: 1 addition & 1 deletion aws/internal/service/sqs/waiter/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func QueueAttributesPropagated(conn *sqs.SQS, url string, expected map[string]st
}

var got map[string]string
err := resource.Retry(QueueAttributePropagationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(QueueAttributePropagationTimeout, func() *resource.RetryError {
var err error

got, err = finder.QueueAttributesByURL(conn, url)
Expand Down
15 changes: 14 additions & 1 deletion aws/internal/tfresource/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/request"
"github.com/hashicorp/aws-sdk-go-base/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)
Expand All @@ -14,7 +15,7 @@ import (
func RetryWhenAwsErrCodeEquals(timeout time.Duration, f func() (interface{}, error), codes ...string) (interface{}, error) {
var output interface{}

err := resource.Retry(timeout, func() *resource.RetryError {
err := RetryOnConnectionResetByPeer(timeout, func() *resource.RetryError {
var err error

output, err = f()
Expand Down Expand Up @@ -114,3 +115,15 @@ func RetryConfigContext(ctx context.Context, delay time.Duration, delayRand time
// more likely to be useful
return resultErr
}

func RetryOnConnectionResetByPeer(timeout time.Duration, f resource.RetryFunc) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main Change

return resource.RetryContext(context.Background(), timeout, func() *resource.RetryError {
err := f()

if err != nil && !err.Retryable && tfawserr.ErrMessageContains(err.Err, request.ErrCodeRequestError, "read: connection reset by peer") {
return resource.RetryableError(err.Err)
}

return err
})
}
53 changes: 53 additions & 0 deletions aws/internal/tfresource/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)
Expand Down Expand Up @@ -97,3 +98,55 @@ func TestRetryConfigContext_error(t *testing.T) {
t.Fatal("timeout")
}
}

func TestRetryOnConnectionResetByPeer(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit Test

var retryCount int32

testCases := []struct {
Name string
F func() *resource.RetryError
ExpectError bool
}{
{
Name: "retryable error",
F: func() *resource.RetryError {
if atomic.CompareAndSwapInt32(&retryCount, 0, 1) {
return resource.RetryableError(awserr.New(request.ErrCodeRequestError, "RequestError other", nil))
}
return nil
},
ExpectError: false,
},
{
Name: "non-retryable RequestError read: connection reset by peer should still be retried",
F: func() *resource.RetryError {
if atomic.CompareAndSwapInt32(&retryCount, 0, 1) {
return resource.NonRetryableError(awserr.New(request.ErrCodeRequestError, "RequestError read: connection reset by peer", nil))
}
return nil
},
ExpectError: false,
},
{
Name: "non-retryable other request error",
F: func() *resource.RetryError {
return resource.NonRetryableError(awserr.New(request.ErrCodeRequestError, "RequestError other", nil))
},
ExpectError: true,
},
}

for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
retryCount = 0

err := tfresource.RetryOnConnectionResetByPeer(5*time.Second, testCase.F)

if testCase.ExpectError && err == nil {
t.Fatal("expected error")
} else if !testCase.ExpectError && err != nil {
t.Fatalf("unexpected error: %s", err)
}
})
}
}
3 changes: 2 additions & 1 deletion aws/resource_aws_accessanalyzer_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/keyvaluetags"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

const (
Expand Down Expand Up @@ -79,7 +80,7 @@ func resourceAwsAccessAnalyzerAnalyzerCreate(d *schema.ResourceData, meta interf
}

// Handle Organizations eventual consistency
err := resource.Retry(accessAnalyzerOrganizationCreationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(accessAnalyzerOrganizationCreationTimeout, func() *resource.RetryError {
_, err := conn.CreateAnalyzer(input)

if isAWSErr(err, accessanalyzer.ErrCodeValidationException, "You must create an organization") {
Expand Down
4 changes: 2 additions & 2 deletions aws/resource_aws_acm_certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func resourceAwsAcmCertificateRead(d *schema.ResourceData, meta interface{}) err
CertificateArn: aws.String(d.Id()),
}

return resource.Retry(AcmCertificateDnsValidationAssignmentTimeout, func() *resource.RetryError {
return tfresource.RetryOnConnectionResetByPeer(AcmCertificateDnsValidationAssignmentTimeout, func() *resource.RetryError {
resp, err := acmconn.DescribeCertificate(params)

if !d.IsNewResource() && tfawserr.ErrCodeEquals(err, acm.ErrCodeResourceNotFoundException) {
Expand Down Expand Up @@ -492,7 +492,7 @@ func resourceAwsAcmCertificateDelete(d *schema.ResourceData, meta interface{}) e
CertificateArn: aws.String(d.Id()),
}

err := resource.Retry(AcmCertificateCrossServicePropagationTimeout, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(AcmCertificateCrossServicePropagationTimeout, func() *resource.RetryError {
_, err := acmconn.DeleteCertificate(params)

if tfawserr.ErrCodeEquals(err, acm.ErrCodeResourceInUseException) {
Expand Down
5 changes: 3 additions & 2 deletions aws/resource_aws_acm_certificate_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

func resourceAwsAcmCertificateValidation() *schema.Resource {
Expand Down Expand Up @@ -71,7 +72,7 @@ func resourceAwsAcmCertificateValidationCreate(d *schema.ResourceData, meta inte
log.Printf("[INFO] No validation_record_fqdns set, skipping check")
}

err = resource.Retry(d.Timeout(schema.TimeoutCreate), func() *resource.RetryError {
err = tfresource.RetryOnConnectionResetByPeer(d.Timeout(schema.TimeoutCreate), func() *resource.RetryError {
resp, err := acmconn.DescribeCertificate(params)

if err != nil {
Expand Down Expand Up @@ -109,7 +110,7 @@ func resourceAwsAcmCertificateCheckValidationRecords(validationRecordFqdns []int
}
var err error
var output *acm.DescribeCertificateOutput
err = resource.Retry(1*time.Minute, func() *resource.RetryError {
err = tfresource.RetryOnConnectionResetByPeer(1*time.Minute, func() *resource.RetryError {
log.Printf("[DEBUG] Certificate domain validation options empty for %s, retrying", aws.StringValue(cert.CertificateArn))
output, err = conn.DescribeCertificate(input)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion aws/resource_aws_acmpca_certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/acmpca/waiter"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

func resourceAwsAcmpcaCertificate() *schema.Resource {
Expand Down Expand Up @@ -124,7 +125,7 @@ func resourceAwsAcmpcaCertificateCreate(d *schema.ResourceData, meta interface{}
}

var output *acmpca.IssueCertificateOutput
err = resource.Retry(waiter.CertificateAuthorityActiveTimeout, func() *resource.RetryError {
err = tfresource.RetryOnConnectionResetByPeer(waiter.CertificateAuthorityActiveTimeout, func() *resource.RetryError {
var err error
output, err = conn.IssueCertificate(input)
if tfawserr.ErrMessageContains(err, acmpca.ErrCodeInvalidStateException, "The certificate authority is not in a valid state for issuing certificates") {
Expand Down
3 changes: 2 additions & 1 deletion aws/resource_aws_acmpca_certificate_authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/terraform-providers/terraform-provider-aws/aws/internal/keyvaluetags"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/acmpca/finder"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/acmpca/waiter"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

func resourceAwsAcmpcaCertificateAuthority() *schema.Resource {
Expand Down Expand Up @@ -298,7 +299,7 @@ func resourceAwsAcmpcaCertificateAuthorityCreate(d *schema.ResourceData, meta in

log.Printf("[DEBUG] Creating ACM PCA Certificate Authority: %s", input)
var output *acmpca.CreateCertificateAuthorityOutput
err := resource.Retry(1*time.Minute, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(1*time.Minute, func() *resource.RetryError {
var err error
output, err = conn.CreateCertificateAuthority(input)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion aws/resource_aws_ami.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/hashcode"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/keyvaluetags"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

const (
Expand Down Expand Up @@ -338,7 +339,7 @@ func resourceAwsAmiRead(d *schema.ResourceData, meta interface{}) error {
}

var res *ec2.DescribeImagesOutput
err := resource.Retry(1*time.Minute, func() *resource.RetryError {
err := tfresource.RetryOnConnectionResetByPeer(1*time.Minute, func() *resource.RetryError {
var err error
res, err = client.DescribeImages(req)
if err != nil {
Expand Down
Loading