Skip to content

Commit

Permalink
Return proper http response code based on retryable error
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerHelmuth committed Jan 24, 2024
1 parent 9047c0e commit 20c5453
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 38 deletions.
21 changes: 21 additions & 0 deletions receiver/otlpreceiver/internal/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors"

import (
"net/http"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -23,3 +25,22 @@ func GetStatusFromError(err error) error {
}
return s.Err()
}

func GetHTTPStatusCodeFromStatus(err error) int {
s, ok := status.FromError(err)
if !ok {
return http.StatusInternalServerError
}
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures
// to see if a code is retryable.
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures-1
// to see a list of retryable http status codes.
switch s.Code() {
// Retryable
case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
return http.StatusServiceUnavailable
// Not Retryable
default:
return http.StatusInternalServerError
}
}
31 changes: 31 additions & 0 deletions receiver/otlpreceiver/internal/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/in

import (
"fmt"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -43,3 +44,33 @@ func Test_GetStatusFromError(t *testing.T) {
})
}
}

func Test_GetHTTPStatusCodeFromStatus(t *testing.T) {
tests := []struct {
name string
input error
expected int
}{
{
name: "Not a Status",
input: fmt.Errorf("not a status error"),
expected: http.StatusInternalServerError,
},
{
name: "Retryable Status",
input: status.New(codes.Unavailable, "test").Err(),
expected: http.StatusServiceUnavailable,
},
{
name: "Non-retryable Status",
input: status.New(codes.InvalidArgument, "test").Err(),
expected: http.StatusInternalServerError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := GetHTTPStatusCodeFromStatus(tt.input)
assert.Equal(t, tt.expected, result)
})
}
}
122 changes: 87 additions & 35 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ var otlpReceiverID = component.NewIDWithName("otlp", otlpReceiverName)

func TestJsonHttp(t *testing.T) {
tests := []struct {
name string
encoding string
contentType string
err error
name string
encoding string
contentType string
err error
expectedStatus *spb.Status
expectedStatusCode int
}{
{
name: "JSONUncompressed",
Expand Down Expand Up @@ -83,16 +85,36 @@ func TestJsonHttp(t *testing.T) {
contentType: "application/json",
},
{
name: "NotGRPCError",
encoding: "",
contentType: "application/json",
err: errors.New("my error"),
name: "Permanent NotGRPCError",
encoding: "",
contentType: "application/json",
err: consumererror.NewPermanent(errors.New("my error")),
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: "Permanent error: my error"},
expectedStatusCode: http.StatusInternalServerError,
},
{
name: "GRPCError",
encoding: "",
contentType: "application/json",
err: status.New(codes.Unavailable, "").Err(),
name: "Retryable NotGRPCError",
encoding: "",
contentType: "application/json",
err: errors.New("my error"),
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"},
expectedStatusCode: http.StatusServiceUnavailable,
},
{
name: "Permanent GRPCError",
encoding: "",
contentType: "application/json",
err: status.New(codes.InvalidArgument, "").Err(),
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""},
expectedStatusCode: http.StatusInternalServerError,
},
{
name: "Retryable GRPCError",
encoding: "",
contentType: "application/json",
err: status.New(codes.Unavailable, "").Err(),
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""},
expectedStatusCode: http.StatusServiceUnavailable,
},
}
addr := testutil.GetAvailableLocalAddress(t)
Expand All @@ -108,7 +130,7 @@ func TestJsonHttp(t *testing.T) {

for _, dr := range generateDataRequests(t) {
url := "http://" + addr + dr.path
respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.err != nil)
respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.expectedStatusCode)
if tt.err == nil {
tr := ptraceotlp.NewExportResponse()
assert.NoError(t, tr.UnmarshalJSON(respBytes), "Unable to unmarshal response to Response")
Expand All @@ -120,7 +142,7 @@ func TestJsonHttp(t *testing.T) {
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
fmt.Println(errStatus)
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}))
assert.True(t, proto.Equal(errStatus, tt.expectedStatus))
}
sink.checkData(t, dr.data, 0)
}
Expand Down Expand Up @@ -302,9 +324,11 @@ func TestHandleInvalidRequests(t *testing.T) {

func TestProtoHttp(t *testing.T) {
tests := []struct {
name string
encoding string
err error
name string
encoding string
err error
expectedStatus *spb.Status
expectedStatusCode int
}{
{
name: "ProtoUncompressed",
Expand All @@ -319,14 +343,32 @@ func TestProtoHttp(t *testing.T) {
encoding: "zstd",
},
{
name: "NotGRPCError",
encoding: "",
err: errors.New("my error"),
name: "Permanent NotGRPCError",
encoding: "",
err: consumererror.NewPermanent(errors.New("my error")),
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: "Permanent error: my error"},
expectedStatusCode: http.StatusInternalServerError,
},
{
name: "GRPCError",
encoding: "",
err: status.New(codes.Unavailable, "").Err(),
name: "Retryable NotGRPCError",
encoding: "",
err: errors.New("my error"),
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"},
expectedStatusCode: http.StatusServiceUnavailable,
},
{
name: "Permanent GRPCError",
encoding: "",
err: status.New(codes.InvalidArgument, "").Err(),
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""},
expectedStatusCode: http.StatusInternalServerError,
},
{
name: "Retryable GRPCError",
encoding: "",
err: status.New(codes.Unavailable, "").Err(),
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""},
expectedStatusCode: http.StatusServiceUnavailable,
},
}
addr := testutil.GetAvailableLocalAddress(t)
Expand All @@ -345,7 +387,7 @@ func TestProtoHttp(t *testing.T) {

for _, dr := range generateDataRequests(t) {
url := "http://" + addr + dr.path
respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.err != nil)
respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.expectedStatusCode)
if tt.err == nil {
tr := ptraceotlp.NewExportResponse()
assert.NoError(t, tr.UnmarshalProto(respBytes))
Expand All @@ -356,7 +398,7 @@ func TestProtoHttp(t *testing.T) {
if s, ok := status.FromError(tt.err); ok {
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}))
assert.True(t, proto.Equal(errStatus, tt.expectedStatus))
}
sink.checkData(t, dr.data, 0)
}
Expand Down Expand Up @@ -560,20 +602,30 @@ func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) {
// trace receiver.
func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
type ingestionStateTest struct {
okToIngest bool
expectedCode codes.Code
okToIngest bool
err error
expectedCode codes.Code
expectedStatusCode int
}

expectedReceivedBatches := 2
expectedIngestionBlockedRPCs := 1
expectedIngestionBlockedRPCs := 2
ingestionStates := []ingestionStateTest{
{
okToIngest: true,
expectedCode: codes.OK,
},
{
okToIngest: false,
expectedCode: codes.Unavailable,
okToIngest: false,
err: consumererror.NewPermanent(errors.New("consumer error")),
expectedCode: codes.InvalidArgument,
expectedStatusCode: http.StatusInternalServerError,
},
{
okToIngest: false,
err: errors.New("consumer error"),
expectedCode: codes.Unavailable,
expectedStatusCode: http.StatusServiceUnavailable,
},
{
okToIngest: true,
Expand All @@ -599,7 +651,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
if ingestionState.okToIngest {
sink.SetConsumeError(nil)
} else {
sink.SetConsumeError(errors.New("consumer error"))
sink.SetConsumeError(ingestionState.err)
}

pbMarshaler := ptrace.ProtoMarshaler{}
Expand All @@ -620,7 +672,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
} else {
errStatus := &spb.Status{}
assert.NoError(t, proto.Unmarshal(respBytes, errStatus))
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.Equal(t, ingestionState.expectedStatusCode, resp.StatusCode)
assert.Equal(t, ingestionState.expectedCode, codes.Code(errStatus.Code))
}
}
Expand Down Expand Up @@ -853,7 +905,7 @@ func doHTTPRequest(
encoding string,
contentType string,
data []byte,
expectErr bool,
expectStatusCode int,
) []byte {
req := createHTTPRequest(t, url, encoding, contentType, data)
resp, err := http.DefaultClient.Do(req)
Expand All @@ -866,10 +918,10 @@ func doHTTPRequest(
// For cases like "application/json; charset=utf-8", the response will be only "application/json"
require.True(t, strings.HasPrefix(strings.ToLower(contentType), resp.Header.Get("Content-Type")))

if !expectErr {
if expectStatusCode == 0 {
require.Equal(t, http.StatusOK, resp.StatusCode)
} else {
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
require.Equal(t, expectStatusCode, resp.StatusCode)
}

return respBytes
Expand Down
7 changes: 4 additions & 3 deletions receiver/otlpreceiver/otlphttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"mime"
"net/http"

"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -42,7 +43,7 @@ func handleTraces(resp http.ResponseWriter, req *http.Request, tracesReceiver *t

otlpResp, err := tracesReceiver.Export(req.Context(), otlpReq)
if err != nil {
writeError(resp, enc, err, http.StatusInternalServerError)
writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err))
return
}

Expand Down Expand Up @@ -73,7 +74,7 @@ func handleMetrics(resp http.ResponseWriter, req *http.Request, metricsReceiver

otlpResp, err := metricsReceiver.Export(req.Context(), otlpReq)
if err != nil {
writeError(resp, enc, err, http.StatusInternalServerError)
writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err))
return
}

Expand Down Expand Up @@ -104,7 +105,7 @@ func handleLogs(resp http.ResponseWriter, req *http.Request, logsReceiver *logs.

otlpResp, err := logsReceiver.Export(req.Context(), otlpReq)
if err != nil {
writeError(resp, enc, err, http.StatusInternalServerError)
writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err))
return
}

Expand Down

0 comments on commit 20c5453

Please sign in to comment.