Skip to content

Commit

Permalink
Parse errormsgs in retryable status codes (#5541)
Browse files Browse the repository at this point in the history
Fixes #5536

---------

Co-authored-by: Damien Mathieu <42@dmathieu.com>
  • Loading branch information
pree-dew and dmathieu authored Jul 18, 2024
1 parent 30cc379 commit 23abb5a
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Correct the `Tracer` names used in `go.opentelemetry.io/otel/example/passthrough`. (#5612)
- Correct the `Meter` name used in `go.opentelemetry.io/otel/example/prometheus`. (#5612)
- Correct the `Tracer` names used in `go.opentelemetry.io/otel/example/zipkin`. (#5612)
- Pass the underlying error rather than a generic retry-able failure in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`, `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp` and `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#5541)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
51 changes: 45 additions & 6 deletions exporters/otlp/otlplog/otlploghttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -143,7 +144,7 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs)
resp, err := c.client.Do(request.Request)
var urlErr *url.Error
if errors.As(err, &urlErr) && urlErr.Temporary() {
return newResponseError(http.Header{})
return newResponseError(http.Header{}, err)
}
if err != nil {
return err
Expand Down Expand Up @@ -184,13 +185,25 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs)
sc == http.StatusServiceUnavailable,
sc == http.StatusGatewayTimeout:
// Retry-able failure.
rErr = newResponseError(resp.Header)
rErr = newResponseError(resp.Header, nil)

// Going to retry, drain the body to reuse the connection.
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
// server may return a message with the response
// body, so we read it to include in the error
// message to be returned. It will help in
// debugging the actual issue.
var respData bytes.Buffer
if _, err := io.Copy(&respData, resp.Body); err != nil {
_ = resp.Body.Close()
return err
}

// overwrite the error message with the response body
// if it is not empty
if respStr := strings.TrimSpace(respData.String()); respStr != "" {
// Include response for context.
e := errors.New(respStr)
rErr = newResponseError(resp.Header, e)
}
default:
rErr = fmt.Errorf("failed to send logs to %s: %s", request.URL, resp.Status)
}
Expand Down Expand Up @@ -266,24 +279,50 @@ func (r *request) reset(ctx context.Context) {
// retryableError represents a request failure that can be retried.
type retryableError struct {
throttle int64
err error
}

// newResponseError returns a retryableError and will extract any explicit
// throttle delay contained in headers.
func newResponseError(header http.Header) error {
// throttle delay contained in headers. The returned error wraps wrapped
// if it is not nil.
func newResponseError(header http.Header, wrapped error) error {
var rErr retryableError
if v := header.Get("Retry-After"); v != "" {
if t, err := strconv.ParseInt(v, 10, 64); err == nil {
rErr.throttle = t
}
}

rErr.err = wrapped
return rErr
}

func (e retryableError) Error() string {
if e.err != nil {
return fmt.Sprintf("retry-able request failure: %v", e.err.Error())
}

return "retry-able request failure"
}

func (e retryableError) Unwrap() error {
return e.err
}

func (e retryableError) As(target interface{}) bool {
if e.err == nil {
return false
}

switch v := target.(type) {
case **retryableError:
*v = &e
return true
default:
return false
}
}

// evaluate returns if err is retry-able. If it is and it includes an explicit
// throttling delay, that delay is also returned.
func evaluate(err error) (bool, time.Duration) {
Expand Down
24 changes: 24 additions & 0 deletions exporters/otlp/otlplog/otlploghttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,30 @@ func TestConfig(t *testing.T) {
assert.Len(t, rCh, 0, "failed HTTP responses did not occur")
})

t.Run("WithRetryAndExporterErr", func(t *testing.T) {
exporterErr := errors.New("rpc error: code = Unavailable desc = service.name not found in resource attributes")
rCh := make(chan exportResult, 1)
rCh <- exportResult{Err: &httpResponseError{
Status: http.StatusTooManyRequests,
Err: exporterErr,
}}
exp, coll := factoryFunc("", rCh, WithRetry(RetryConfig{
Enabled: false,
}))
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
// Push this after Shutdown so the HTTP server doesn't hang.
t.Cleanup(func() { close(rCh) })
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
err := exp.Export(ctx, make([]log.Record, 1))
assert.ErrorContains(t, err, exporterErr.Error())

// To test the `Unwrap` and `As` function of retryable error
var retryErr *retryableError
assert.ErrorAs(t, err, &retryErr)
assert.ErrorIs(t, err, *retryErr)
})

t.Run("WithURLPath", func(t *testing.T) {
path := "/prefix/v2/logs"
ePt := fmt.Sprintf("http://localhost:0%s", path)
Expand Down
51 changes: 45 additions & 6 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -146,7 +147,7 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
resp, err := c.httpClient.Do(request.Request)
var urlErr *url.Error
if errors.As(err, &urlErr) && urlErr.Temporary() {
return newResponseError(http.Header{})
return newResponseError(http.Header{}, err)
}
if err != nil {
return err
Expand Down Expand Up @@ -187,13 +188,25 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
sc == http.StatusServiceUnavailable,
sc == http.StatusGatewayTimeout:
// Retry-able failure.
rErr = newResponseError(resp.Header)
rErr = newResponseError(resp.Header, nil)

// Going to retry, drain the body to reuse the connection.
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
// server may return a message with the response
// body, so we read it to include in the error
// message to be returned. It will help in
// debugging the actual issue.
var respData bytes.Buffer
if _, err := io.Copy(&respData, resp.Body); err != nil {
_ = resp.Body.Close()
return err
}

// overwrite the error message with the response body
// if it is not empty
if respStr := strings.TrimSpace(respData.String()); respStr != "" {
// Include response for context.
e := errors.New(respStr)
rErr = newResponseError(resp.Header, e)
}
default:
rErr = fmt.Errorf("failed to send metrics to %s: %s", request.URL, resp.Status)
}
Expand Down Expand Up @@ -269,24 +282,50 @@ func (r *request) reset(ctx context.Context) {
// retryableError represents a request failure that can be retried.
type retryableError struct {
throttle int64
err error
}

// newResponseError returns a retryableError and will extract any explicit
// throttle delay contained in headers.
func newResponseError(header http.Header) error {
// throttle delay contained in headers. The returned error wraps wrapped
// if it is not nil.
func newResponseError(header http.Header, wrapped error) error {
var rErr retryableError
if v := header.Get("Retry-After"); v != "" {
if t, err := strconv.ParseInt(v, 10, 64); err == nil {
rErr.throttle = t
}
}

rErr.err = wrapped
return rErr
}

func (e retryableError) Error() string {
if e.err != nil {
return fmt.Sprintf("retry-able request failure: %s", e.err.Error())
}

return "retry-able request failure"
}

func (e retryableError) Unwrap() error {
return e.err
}

func (e retryableError) As(target interface{}) bool {
if e.err == nil {
return false
}

switch v := target.(type) {
case **retryableError:
*v = &e
return true
default:
return false
}
}

// evaluate returns if err is retry-able. If it is and it includes an explicit
// throttling delay, that delay is also returned.
func evaluate(err error) (bool, time.Duration) {
Expand Down
24 changes: 24 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,30 @@ func TestConfig(t *testing.T) {
assert.Len(t, rCh, 0, "failed HTTP responses did not occur")
})

t.Run("WithRetryAndExporterErr", func(t *testing.T) {
exporterErr := errors.New("rpc error: code = Unavailable desc = service.name not found in resource attributes")
rCh := make(chan otest.ExportResult, 1)
rCh <- otest.ExportResult{Err: &otest.HTTPResponseError{
Status: http.StatusTooManyRequests,
Err: exporterErr,
}}
exp, coll := factoryFunc("", rCh, WithRetry(RetryConfig{
Enabled: false,
}))
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
// Push this after Shutdown so the HTTP server doesn't hang.
t.Cleanup(func() { close(rCh) })
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
err := exp.Export(ctx, &metricdata.ResourceMetrics{})
assert.ErrorContains(t, err, exporterErr.Error())

// To test the `Unwrap` and `As` function of retryable error
var retryErr *retryableError
assert.ErrorAs(t, err, &retryErr)
assert.ErrorIs(t, err, *retryErr)
})

t.Run("WithURLPath", func(t *testing.T) {
path := "/prefix/v2/metrics"
ePt := fmt.Sprintf("http://localhost:0%s", path)
Expand Down
57 changes: 50 additions & 7 deletions exporters/otlp/otlptrace/otlptracehttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -151,7 +152,7 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
resp, err := d.client.Do(request.Request)
var urlErr *url.Error
if errors.As(err, &urlErr) && urlErr.Temporary() {
return newResponseError(http.Header{})
return newResponseError(http.Header{}, err)
}
if err != nil {
return err
Expand Down Expand Up @@ -198,11 +199,27 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
sc == http.StatusBadGateway,
sc == http.StatusServiceUnavailable,
sc == http.StatusGatewayTimeout:
// Retry-able failures. Drain the body to reuse the connection.
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
otel.Handle(err)
// Retry-able failures.
rErr := newResponseError(resp.Header, nil)

// server may return a message with the response
// body, so we read it to include in the error
// message to be returned. It will help in
// debugging the actual issue.
var respData bytes.Buffer
if _, err := io.Copy(&respData, resp.Body); err != nil {
_ = resp.Body.Close()
return err
}
return newResponseError(resp.Header)

// overwrite the error message with the response body
// if it is not empty
if respStr := strings.TrimSpace(respData.String()); respStr != "" {
// Include response for context.
e := errors.New(respStr)
rErr = newResponseError(resp.Header, e)
}
return rErr
default:
return fmt.Errorf("failed to send to %s: %s", request.URL, resp.Status)
}
Expand Down Expand Up @@ -291,24 +308,50 @@ func (r *request) reset(ctx context.Context) {
// retryableError represents a request failure that can be retried.
type retryableError struct {
throttle int64
err error
}

// newResponseError returns a retryableError and will extract any explicit
// throttle delay contained in headers.
func newResponseError(header http.Header) error {
// throttle delay contained in headers. The returned error wraps wrapped
// if it is not nil.
func newResponseError(header http.Header, wrapped error) error {
var rErr retryableError
if s, ok := header["Retry-After"]; ok {
if t, err := strconv.ParseInt(s[0], 10, 64); err == nil {
rErr.throttle = t
}
}

rErr.err = wrapped
return rErr
}

func (e retryableError) Error() string {
if e.err != nil {
return fmt.Sprintf("retry-able request failure: %s", e.err.Error())
}

return "retry-able request failure"
}

func (e retryableError) Unwrap() error {
return e.err
}

func (e retryableError) As(target interface{}) bool {
if e.err == nil {
return false
}

switch v := target.(type) {
case **retryableError:
*v = &e
return true
default:
return false
}
}

// evaluate returns if err is retry-able. If it is and it includes an explicit
// throttling delay, that delay is also returned.
func evaluate(err error) (bool, time.Duration) {
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/otlptrace/otlptracehttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func TestTimeout(t *testing.T) {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
assert.ErrorContains(t, err, "retry-able request failure")
assert.ErrorContains(t, err, "context deadline exceeded")
}

func TestNoRetry(t *testing.T) {
Expand Down

0 comments on commit 23abb5a

Please sign in to comment.