From 2b0decfcebebc88caa42123839d3e04bfa6dd802 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 27 Mar 2024 10:55:04 -0600 Subject: [PATCH] [receiver/otlp] Return proper http response code based on retryable errors (#9357) **Description:** Updates the receiver's http response to return a proper http status based on whether or not the pipeline returned a retryable error. Builds upon the work done in https://github.com/open-telemetry/opentelemetry-collector/pull/8080 and https://github.com/open-telemetry/opentelemetry-collector/pull/9307 **Link to tracking Issue:** Closes https://github.com/open-telemetry/opentelemetry-collector/issues/9337 Closes https://github.com/open-telemetry/opentelemetry-collector/issues/8132 Closes https://github.com/open-telemetry/opentelemetry-collector/issues/9636 Closes https://github.com/open-telemetry/opentelemetry-collector/issues/6725 **Testing:** Updated lots of unit tests --- .../otlpreciever-http-response-code.yaml | 25 ++++ .../otlpreceiver/internal/errors/errors.go | 24 ++++ .../internal/errors/errors_test.go | 36 ++++++ receiver/otlpreceiver/otlp_test.go | 122 +++++++++++++----- receiver/otlpreceiver/otlphttp.go | 7 +- 5 files changed, 176 insertions(+), 38 deletions(-) create mode 100755 .chloggen/otlpreciever-http-response-code.yaml diff --git a/.chloggen/otlpreciever-http-response-code.yaml b/.chloggen/otlpreciever-http-response-code.yaml new file mode 100755 index 00000000000..f53f83ac0e9 --- /dev/null +++ b/.chloggen/otlpreciever-http-response-code.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: receiver/otlp + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix bug where the otlp receiver did not properly respond with a retryable error code when possible for http + +# One or more tracking issues or pull requests related to the change +issues: [9357] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/receiver/otlpreceiver/internal/errors/errors.go b/receiver/otlpreceiver/internal/errors/errors.go index ee9d79e1c7c..ae2127bf9ba 100644 --- a/receiver/otlpreceiver/internal/errors/errors.go +++ b/receiver/otlpreceiver/internal/errors/errors.go @@ -4,6 +4,8 @@ package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" import ( + "net/http" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -24,3 +26,25 @@ func GetStatusFromError(err error) error { } return s.Err() } + +func GetHTTPStatusCodeFromStatus(err error) int { + s, ok := status.FromError(err) + if !ok { + return http.StatusInternalServerError + } + // See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures + // to see if a code is retryable. + // See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures-1 + // to see a list of retryable http status codes. + switch s.Code() { + // Retryable + case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: + return http.StatusServiceUnavailable + // Retryable + case codes.ResourceExhausted: + return http.StatusTooManyRequests + // Not Retryable + default: + return http.StatusInternalServerError + } +} diff --git a/receiver/otlpreceiver/internal/errors/errors_test.go b/receiver/otlpreceiver/internal/errors/errors_test.go index c75b5bf041f..72db243353f 100644 --- a/receiver/otlpreceiver/internal/errors/errors_test.go +++ b/receiver/otlpreceiver/internal/errors/errors_test.go @@ -5,6 +5,7 @@ package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/in import ( "fmt" + "net/http" "testing" "github.com/stretchr/testify/assert" @@ -43,3 +44,38 @@ func Test_GetStatusFromError(t *testing.T) { }) } } + +func Test_GetHTTPStatusCodeFromStatus(t *testing.T) { + tests := []struct { + name string + input error + expected int + }{ + { + name: "Not a Status", + input: fmt.Errorf("not a status error"), + expected: http.StatusInternalServerError, + }, + { + name: "Retryable Status", + input: status.New(codes.Unavailable, "test").Err(), + expected: http.StatusServiceUnavailable, + }, + { + name: "Non-retryable Status", + input: status.New(codes.InvalidArgument, "test").Err(), + expected: http.StatusInternalServerError, + }, + { + name: "Specifically 429", + input: status.New(codes.ResourceExhausted, "test").Err(), + expected: http.StatusTooManyRequests, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetHTTPStatusCodeFromStatus(tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index fe170cb413f..2720b3cc4c0 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -52,10 +52,12 @@ var otlpReceiverID = component.MustNewIDWithName("otlp", otlpReceiverName) func TestJsonHttp(t *testing.T) { tests := []struct { - name string - encoding string - contentType string - err error + name string + encoding string + contentType string + err error + expectedStatus *spb.Status + expectedStatusCode int }{ { name: "JSONUncompressed", @@ -83,16 +85,36 @@ func TestJsonHttp(t *testing.T) { contentType: "application/json", }, { - name: "NotGRPCError", - encoding: "", - contentType: "application/json", - err: errors.New("my error"), + name: "Permanent NotGRPCError", + encoding: "", + contentType: "application/json", + err: consumererror.NewPermanent(errors.New("my error")), + expectedStatus: &spb.Status{Code: int32(codes.Internal), Message: "Permanent error: my error"}, + expectedStatusCode: http.StatusInternalServerError, }, { - name: "GRPCError", - encoding: "", - contentType: "application/json", - err: status.New(codes.Unavailable, "").Err(), + name: "Retryable NotGRPCError", + encoding: "", + contentType: "application/json", + err: errors.New("my error"), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}, + expectedStatusCode: http.StatusServiceUnavailable, + }, + { + name: "Permanent GRPCError", + encoding: "", + contentType: "application/json", + err: status.New(codes.InvalidArgument, "").Err(), + expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""}, + expectedStatusCode: http.StatusInternalServerError, + }, + { + name: "Retryable GRPCError", + encoding: "", + contentType: "application/json", + err: status.New(codes.Unavailable, "").Err(), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""}, + expectedStatusCode: http.StatusServiceUnavailable, }, } addr := testutil.GetAvailableLocalAddress(t) @@ -108,7 +130,7 @@ func TestJsonHttp(t *testing.T) { for _, dr := range generateDataRequests(t) { url := "http://" + addr + dr.path - respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.err != nil) + respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.expectedStatusCode) if tt.err == nil { tr := ptraceotlp.NewExportResponse() assert.NoError(t, tr.UnmarshalJSON(respBytes), "Unable to unmarshal response to Response") @@ -120,7 +142,7 @@ func TestJsonHttp(t *testing.T) { assert.True(t, proto.Equal(errStatus, s.Proto())) } else { fmt.Println(errStatus) - assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"})) + assert.True(t, proto.Equal(errStatus, tt.expectedStatus)) } sink.checkData(t, dr.data, 0) } @@ -302,9 +324,11 @@ func TestHandleInvalidRequests(t *testing.T) { func TestProtoHttp(t *testing.T) { tests := []struct { - name string - encoding string - err error + name string + encoding string + err error + expectedStatus *spb.Status + expectedStatusCode int }{ { name: "ProtoUncompressed", @@ -319,14 +343,32 @@ func TestProtoHttp(t *testing.T) { encoding: "zstd", }, { - name: "NotGRPCError", - encoding: "", - err: errors.New("my error"), + name: "Permanent NotGRPCError", + encoding: "", + err: consumererror.NewPermanent(errors.New("my error")), + expectedStatus: &spb.Status{Code: int32(codes.Internal), Message: "Permanent error: my error"}, + expectedStatusCode: http.StatusInternalServerError, }, { - name: "GRPCError", - encoding: "", - err: status.New(codes.Unavailable, "").Err(), + name: "Retryable NotGRPCError", + encoding: "", + err: errors.New("my error"), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}, + expectedStatusCode: http.StatusServiceUnavailable, + }, + { + name: "Permanent GRPCError", + encoding: "", + err: status.New(codes.InvalidArgument, "").Err(), + expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""}, + expectedStatusCode: http.StatusInternalServerError, + }, + { + name: "Retryable GRPCError", + encoding: "", + err: status.New(codes.Unavailable, "").Err(), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""}, + expectedStatusCode: http.StatusServiceUnavailable, }, } addr := testutil.GetAvailableLocalAddress(t) @@ -345,7 +387,7 @@ func TestProtoHttp(t *testing.T) { for _, dr := range generateDataRequests(t) { url := "http://" + addr + dr.path - respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.err != nil) + respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.expectedStatusCode) if tt.err == nil { tr := ptraceotlp.NewExportResponse() assert.NoError(t, tr.UnmarshalProto(respBytes)) @@ -356,7 +398,7 @@ func TestProtoHttp(t *testing.T) { if s, ok := status.FromError(tt.err); ok { assert.True(t, proto.Equal(errStatus, s.Proto())) } else { - assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"})) + assert.True(t, proto.Equal(errStatus, tt.expectedStatus)) } sink.checkData(t, dr.data, 0) } @@ -560,20 +602,30 @@ func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) { // trace receiver. func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) { type ingestionStateTest struct { - okToIngest bool - expectedCode codes.Code + okToIngest bool + err error + expectedCode codes.Code + expectedStatusCode int } expectedReceivedBatches := 2 - expectedIngestionBlockedRPCs := 1 + expectedIngestionBlockedRPCs := 2 ingestionStates := []ingestionStateTest{ { okToIngest: true, expectedCode: codes.OK, }, { - okToIngest: false, - expectedCode: codes.Unavailable, + okToIngest: false, + err: consumererror.NewPermanent(errors.New("consumer error")), + expectedCode: codes.Internal, + expectedStatusCode: http.StatusInternalServerError, + }, + { + okToIngest: false, + err: errors.New("consumer error"), + expectedCode: codes.Unavailable, + expectedStatusCode: http.StatusServiceUnavailable, }, { okToIngest: true, @@ -599,7 +651,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) { if ingestionState.okToIngest { sink.SetConsumeError(nil) } else { - sink.SetConsumeError(errors.New("consumer error")) + sink.SetConsumeError(ingestionState.err) } pbMarshaler := ptrace.ProtoMarshaler{} @@ -620,7 +672,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) { } else { errStatus := &spb.Status{} assert.NoError(t, proto.Unmarshal(respBytes, errStatus)) - assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + assert.Equal(t, ingestionState.expectedStatusCode, resp.StatusCode) assert.Equal(t, ingestionState.expectedCode, codes.Code(errStatus.Code)) } } @@ -853,7 +905,7 @@ func doHTTPRequest( encoding string, contentType string, data []byte, - expectErr bool, + expectStatusCode int, ) []byte { req := createHTTPRequest(t, url, encoding, contentType, data) resp, err := http.DefaultClient.Do(req) @@ -866,10 +918,10 @@ func doHTTPRequest( // For cases like "application/json; charset=utf-8", the response will be only "application/json" require.True(t, strings.HasPrefix(strings.ToLower(contentType), resp.Header.Get("Content-Type"))) - if !expectErr { + if expectStatusCode == 0 { require.Equal(t, http.StatusOK, resp.StatusCode) } else { - require.Equal(t, http.StatusInternalServerError, resp.StatusCode) + require.Equal(t, expectStatusCode, resp.StatusCode) } return respBytes diff --git a/receiver/otlpreceiver/otlphttp.go b/receiver/otlpreceiver/otlphttp.go index dca42737052..ae60d8d37d2 100644 --- a/receiver/otlpreceiver/otlphttp.go +++ b/receiver/otlpreceiver/otlphttp.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace" @@ -42,7 +43,7 @@ func handleTraces(resp http.ResponseWriter, req *http.Request, tracesReceiver *t otlpResp, err := tracesReceiver.Export(req.Context(), otlpReq) if err != nil { - writeError(resp, enc, err, http.StatusInternalServerError) + writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err)) return } @@ -73,7 +74,7 @@ func handleMetrics(resp http.ResponseWriter, req *http.Request, metricsReceiver otlpResp, err := metricsReceiver.Export(req.Context(), otlpReq) if err != nil { - writeError(resp, enc, err, http.StatusInternalServerError) + writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err)) return } @@ -104,7 +105,7 @@ func handleLogs(resp http.ResponseWriter, req *http.Request, logsReceiver *logs. otlpResp, err := logsReceiver.Export(req.Context(), otlpReq) if err != nil { - writeError(resp, enc, err, http.StatusInternalServerError) + writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err)) return }