diff --git a/receiver/otlpreceiver/internal/errors/errors.go b/receiver/otlpreceiver/internal/errors/errors.go index aa583ed23d6..eedb9381e53 100644 --- a/receiver/otlpreceiver/internal/errors/errors.go +++ b/receiver/otlpreceiver/internal/errors/errors.go @@ -4,6 +4,8 @@ package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" import ( + "net/http" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -23,3 +25,22 @@ func GetStatusFromError(err error) error { } return s.Err() } + +func GetHTTPStatusCodeFromStatus(err error) int { + s, ok := status.FromError(err) + if !ok { + return http.StatusInternalServerError + } + // See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures + // to see if a code is retryable. + // See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures-1 + // to see a list of retryable http status codes. + switch s.Code() { + // Retryable + case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: + return http.StatusServiceUnavailable + // Not Retryable + default: + return http.StatusInternalServerError + } +} diff --git a/receiver/otlpreceiver/internal/errors/errors_test.go b/receiver/otlpreceiver/internal/errors/errors_test.go index 1e52032c2b1..4b36effa276 100644 --- a/receiver/otlpreceiver/internal/errors/errors_test.go +++ b/receiver/otlpreceiver/internal/errors/errors_test.go @@ -5,6 +5,7 @@ package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/in import ( "fmt" + "net/http" "testing" "github.com/stretchr/testify/assert" @@ -43,3 +44,33 @@ func Test_GetStatusFromError(t *testing.T) { }) } } + +func Test_GetHTTPStatusCodeFromStatus(t *testing.T) { + tests := []struct { + name string + input error + expected int + }{ + { + name: "Not a Status", + input: fmt.Errorf("not a status error"), + expected: http.StatusInternalServerError, + }, + { + name: "Retryable Status", + input: status.New(codes.Unavailable, "test").Err(), + expected: http.StatusServiceUnavailable, + }, + { + name: "Non-retryable Status", + input: status.New(codes.InvalidArgument, "test").Err(), + expected: http.StatusInternalServerError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetHTTPStatusCodeFromStatus(tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 20e00a38292..54fe9ad8f12 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -52,10 +52,12 @@ var otlpReceiverID = component.NewIDWithName("otlp", otlpReceiverName) func TestJsonHttp(t *testing.T) { tests := []struct { - name string - encoding string - contentType string - err error + name string + encoding string + contentType string + err error + expectedStatus *spb.Status + expectedStatusCode int }{ { name: "JSONUncompressed", @@ -83,16 +85,36 @@ func TestJsonHttp(t *testing.T) { contentType: "application/json", }, { - name: "NotGRPCError", - encoding: "", - contentType: "application/json", - err: errors.New("my error"), + name: "Permanent NotGRPCError", + encoding: "", + contentType: "application/json", + err: consumererror.NewPermanent(errors.New("my error")), + expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: "Permanent error: my error"}, + expectedStatusCode: http.StatusInternalServerError, }, { - name: "GRPCError", - encoding: "", - contentType: "application/json", - err: status.New(codes.Unavailable, "").Err(), + name: "Retryable NotGRPCError", + encoding: "", + contentType: "application/json", + err: errors.New("my error"), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}, + expectedStatusCode: http.StatusServiceUnavailable, + }, + { + name: "Permanent GRPCError", + encoding: "", + contentType: "application/json", + err: status.New(codes.InvalidArgument, "").Err(), + expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""}, + expectedStatusCode: http.StatusInternalServerError, + }, + { + name: "Retryable GRPCError", + encoding: "", + contentType: "application/json", + err: status.New(codes.Unavailable, "").Err(), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""}, + expectedStatusCode: http.StatusServiceUnavailable, }, } addr := testutil.GetAvailableLocalAddress(t) @@ -108,7 +130,7 @@ func TestJsonHttp(t *testing.T) { for _, dr := range generateDataRequests(t) { url := "http://" + addr + dr.path - respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.err != nil) + respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.expectedStatusCode) if tt.err == nil { tr := ptraceotlp.NewExportResponse() assert.NoError(t, tr.UnmarshalJSON(respBytes), "Unable to unmarshal response to Response") @@ -120,7 +142,7 @@ func TestJsonHttp(t *testing.T) { assert.True(t, proto.Equal(errStatus, s.Proto())) } else { fmt.Println(errStatus) - assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"})) + assert.True(t, proto.Equal(errStatus, tt.expectedStatus)) } sink.checkData(t, dr.data, 0) } @@ -302,9 +324,11 @@ func TestHandleInvalidRequests(t *testing.T) { func TestProtoHttp(t *testing.T) { tests := []struct { - name string - encoding string - err error + name string + encoding string + err error + expectedStatus *spb.Status + expectedStatusCode int }{ { name: "ProtoUncompressed", @@ -319,14 +343,32 @@ func TestProtoHttp(t *testing.T) { encoding: "zstd", }, { - name: "NotGRPCError", - encoding: "", - err: errors.New("my error"), + name: "Permanent NotGRPCError", + encoding: "", + err: consumererror.NewPermanent(errors.New("my error")), + expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: "Permanent error: my error"}, + expectedStatusCode: http.StatusInternalServerError, }, { - name: "GRPCError", - encoding: "", - err: status.New(codes.Unavailable, "").Err(), + name: "Retryable NotGRPCError", + encoding: "", + err: errors.New("my error"), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}, + expectedStatusCode: http.StatusServiceUnavailable, + }, + { + name: "Permanent GRPCError", + encoding: "", + err: status.New(codes.InvalidArgument, "").Err(), + expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""}, + expectedStatusCode: http.StatusInternalServerError, + }, + { + name: "Retryable GRPCError", + encoding: "", + err: status.New(codes.Unavailable, "").Err(), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""}, + expectedStatusCode: http.StatusServiceUnavailable, }, } addr := testutil.GetAvailableLocalAddress(t) @@ -345,7 +387,7 @@ func TestProtoHttp(t *testing.T) { for _, dr := range generateDataRequests(t) { url := "http://" + addr + dr.path - respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.err != nil) + respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.expectedStatusCode) if tt.err == nil { tr := ptraceotlp.NewExportResponse() assert.NoError(t, tr.UnmarshalProto(respBytes)) @@ -356,7 +398,7 @@ func TestProtoHttp(t *testing.T) { if s, ok := status.FromError(tt.err); ok { assert.True(t, proto.Equal(errStatus, s.Proto())) } else { - assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"})) + assert.True(t, proto.Equal(errStatus, tt.expectedStatus)) } sink.checkData(t, dr.data, 0) } @@ -560,20 +602,30 @@ func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) { // trace receiver. func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) { type ingestionStateTest struct { - okToIngest bool - expectedCode codes.Code + okToIngest bool + err error + expectedCode codes.Code + expectedStatusCode int } expectedReceivedBatches := 2 - expectedIngestionBlockedRPCs := 1 + expectedIngestionBlockedRPCs := 2 ingestionStates := []ingestionStateTest{ { okToIngest: true, expectedCode: codes.OK, }, { - okToIngest: false, - expectedCode: codes.Unavailable, + okToIngest: false, + err: consumererror.NewPermanent(errors.New("consumer error")), + expectedCode: codes.InvalidArgument, + expectedStatusCode: http.StatusInternalServerError, + }, + { + okToIngest: false, + err: errors.New("consumer error"), + expectedCode: codes.Unavailable, + expectedStatusCode: http.StatusServiceUnavailable, }, { okToIngest: true, @@ -599,7 +651,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) { if ingestionState.okToIngest { sink.SetConsumeError(nil) } else { - sink.SetConsumeError(errors.New("consumer error")) + sink.SetConsumeError(ingestionState.err) } pbMarshaler := ptrace.ProtoMarshaler{} @@ -620,7 +672,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) { } else { errStatus := &spb.Status{} assert.NoError(t, proto.Unmarshal(respBytes, errStatus)) - assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + assert.Equal(t, ingestionState.expectedStatusCode, resp.StatusCode) assert.Equal(t, ingestionState.expectedCode, codes.Code(errStatus.Code)) } } @@ -853,7 +905,7 @@ func doHTTPRequest( encoding string, contentType string, data []byte, - expectErr bool, + expectStatusCode int, ) []byte { req := createHTTPRequest(t, url, encoding, contentType, data) resp, err := http.DefaultClient.Do(req) @@ -866,10 +918,10 @@ func doHTTPRequest( // For cases like "application/json; charset=utf-8", the response will be only "application/json" require.True(t, strings.HasPrefix(strings.ToLower(contentType), resp.Header.Get("Content-Type"))) - if !expectErr { + if expectStatusCode == 0 { require.Equal(t, http.StatusOK, resp.StatusCode) } else { - require.Equal(t, http.StatusInternalServerError, resp.StatusCode) + require.Equal(t, expectStatusCode, resp.StatusCode) } return respBytes diff --git a/receiver/otlpreceiver/otlphttp.go b/receiver/otlpreceiver/otlphttp.go index dca42737052..09910682016 100644 --- a/receiver/otlpreceiver/otlphttp.go +++ b/receiver/otlpreceiver/otlphttp.go @@ -9,6 +9,7 @@ import ( "mime" "net/http" + "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -42,7 +43,7 @@ func handleTraces(resp http.ResponseWriter, req *http.Request, tracesReceiver *t otlpResp, err := tracesReceiver.Export(req.Context(), otlpReq) if err != nil { - writeError(resp, enc, err, http.StatusInternalServerError) + writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err)) return } @@ -73,7 +74,7 @@ func handleMetrics(resp http.ResponseWriter, req *http.Request, metricsReceiver otlpResp, err := metricsReceiver.Export(req.Context(), otlpReq) if err != nil { - writeError(resp, enc, err, http.StatusInternalServerError) + writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err)) return } @@ -104,7 +105,7 @@ func handleLogs(resp http.ResponseWriter, req *http.Request, logsReceiver *logs. otlpResp, err := logsReceiver.Export(req.Context(), otlpReq) if err != nil { - writeError(resp, enc, err, http.StatusInternalServerError) + writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err)) return }