Skip to content

Commit

Permalink
[receiver/otlp] Return proper http response code based on retryable e…
Browse files Browse the repository at this point in the history
…rrors (#9357)

**Description:**
Updates the receiver's http response to return a proper http status
based on whether or not the pipeline returned a retryable error. Builds
upon the work done in
#8080 and
#9307

**Link to tracking Issue:**

Closes
#9337
Closes
#8132
Closes
#9636
Closes
#6725

**Testing:**

Updated lots of unit tests
  • Loading branch information
TylerHelmuth authored Mar 27, 2024
1 parent f237238 commit 2b0decf
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 38 deletions.
25 changes: 25 additions & 0 deletions .chloggen/otlpreciever-http-response-code.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: receiver/otlp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix bug where the otlp receiver did not properly respond with a retryable error code when possible for http

# One or more tracking issues or pull requests related to the change
issues: [9357]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
24 changes: 24 additions & 0 deletions receiver/otlpreceiver/internal/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors"

import (
"net/http"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -24,3 +26,25 @@ func GetStatusFromError(err error) error {
}
return s.Err()
}

func GetHTTPStatusCodeFromStatus(err error) int {
s, ok := status.FromError(err)
if !ok {
return http.StatusInternalServerError
}
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures
// to see if a code is retryable.
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures-1
// to see a list of retryable http status codes.
switch s.Code() {
// Retryable
case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
return http.StatusServiceUnavailable
// Retryable
case codes.ResourceExhausted:
return http.StatusTooManyRequests
// Not Retryable
default:
return http.StatusInternalServerError
}
}
36 changes: 36 additions & 0 deletions receiver/otlpreceiver/internal/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/in

import (
"fmt"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -43,3 +44,38 @@ func Test_GetStatusFromError(t *testing.T) {
})
}
}

func Test_GetHTTPStatusCodeFromStatus(t *testing.T) {
tests := []struct {
name string
input error
expected int
}{
{
name: "Not a Status",
input: fmt.Errorf("not a status error"),
expected: http.StatusInternalServerError,
},
{
name: "Retryable Status",
input: status.New(codes.Unavailable, "test").Err(),
expected: http.StatusServiceUnavailable,
},
{
name: "Non-retryable Status",
input: status.New(codes.InvalidArgument, "test").Err(),
expected: http.StatusInternalServerError,
},
{
name: "Specifically 429",
input: status.New(codes.ResourceExhausted, "test").Err(),
expected: http.StatusTooManyRequests,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := GetHTTPStatusCodeFromStatus(tt.input)
assert.Equal(t, tt.expected, result)
})
}
}
122 changes: 87 additions & 35 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ var otlpReceiverID = component.MustNewIDWithName("otlp", otlpReceiverName)

func TestJsonHttp(t *testing.T) {
tests := []struct {
name string
encoding string
contentType string
err error
name string
encoding string
contentType string
err error
expectedStatus *spb.Status
expectedStatusCode int
}{
{
name: "JSONUncompressed",
Expand Down Expand Up @@ -83,16 +85,36 @@ func TestJsonHttp(t *testing.T) {
contentType: "application/json",
},
{
name: "NotGRPCError",
encoding: "",
contentType: "application/json",
err: errors.New("my error"),
name: "Permanent NotGRPCError",
encoding: "",
contentType: "application/json",
err: consumererror.NewPermanent(errors.New("my error")),
expectedStatus: &spb.Status{Code: int32(codes.Internal), Message: "Permanent error: my error"},
expectedStatusCode: http.StatusInternalServerError,
},
{
name: "GRPCError",
encoding: "",
contentType: "application/json",
err: status.New(codes.Unavailable, "").Err(),
name: "Retryable NotGRPCError",
encoding: "",
contentType: "application/json",
err: errors.New("my error"),
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"},
expectedStatusCode: http.StatusServiceUnavailable,
},
{
name: "Permanent GRPCError",
encoding: "",
contentType: "application/json",
err: status.New(codes.InvalidArgument, "").Err(),
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""},
expectedStatusCode: http.StatusInternalServerError,
},
{
name: "Retryable GRPCError",
encoding: "",
contentType: "application/json",
err: status.New(codes.Unavailable, "").Err(),
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""},
expectedStatusCode: http.StatusServiceUnavailable,
},
}
addr := testutil.GetAvailableLocalAddress(t)
Expand All @@ -108,7 +130,7 @@ func TestJsonHttp(t *testing.T) {

for _, dr := range generateDataRequests(t) {
url := "http://" + addr + dr.path
respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.err != nil)
respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.expectedStatusCode)
if tt.err == nil {
tr := ptraceotlp.NewExportResponse()
assert.NoError(t, tr.UnmarshalJSON(respBytes), "Unable to unmarshal response to Response")
Expand All @@ -120,7 +142,7 @@ func TestJsonHttp(t *testing.T) {
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
fmt.Println(errStatus)
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}))
assert.True(t, proto.Equal(errStatus, tt.expectedStatus))
}
sink.checkData(t, dr.data, 0)
}
Expand Down Expand Up @@ -302,9 +324,11 @@ func TestHandleInvalidRequests(t *testing.T) {

func TestProtoHttp(t *testing.T) {
tests := []struct {
name string
encoding string
err error
name string
encoding string
err error
expectedStatus *spb.Status
expectedStatusCode int
}{
{
name: "ProtoUncompressed",
Expand All @@ -319,14 +343,32 @@ func TestProtoHttp(t *testing.T) {
encoding: "zstd",
},
{
name: "NotGRPCError",
encoding: "",
err: errors.New("my error"),
name: "Permanent NotGRPCError",
encoding: "",
err: consumererror.NewPermanent(errors.New("my error")),
expectedStatus: &spb.Status{Code: int32(codes.Internal), Message: "Permanent error: my error"},
expectedStatusCode: http.StatusInternalServerError,
},
{
name: "GRPCError",
encoding: "",
err: status.New(codes.Unavailable, "").Err(),
name: "Retryable NotGRPCError",
encoding: "",
err: errors.New("my error"),
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"},
expectedStatusCode: http.StatusServiceUnavailable,
},
{
name: "Permanent GRPCError",
encoding: "",
err: status.New(codes.InvalidArgument, "").Err(),
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""},
expectedStatusCode: http.StatusInternalServerError,
},
{
name: "Retryable GRPCError",
encoding: "",
err: status.New(codes.Unavailable, "").Err(),
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""},
expectedStatusCode: http.StatusServiceUnavailable,
},
}
addr := testutil.GetAvailableLocalAddress(t)
Expand All @@ -345,7 +387,7 @@ func TestProtoHttp(t *testing.T) {

for _, dr := range generateDataRequests(t) {
url := "http://" + addr + dr.path
respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.err != nil)
respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.expectedStatusCode)
if tt.err == nil {
tr := ptraceotlp.NewExportResponse()
assert.NoError(t, tr.UnmarshalProto(respBytes))
Expand All @@ -356,7 +398,7 @@ func TestProtoHttp(t *testing.T) {
if s, ok := status.FromError(tt.err); ok {
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}))
assert.True(t, proto.Equal(errStatus, tt.expectedStatus))
}
sink.checkData(t, dr.data, 0)
}
Expand Down Expand Up @@ -560,20 +602,30 @@ func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) {
// trace receiver.
func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
type ingestionStateTest struct {
okToIngest bool
expectedCode codes.Code
okToIngest bool
err error
expectedCode codes.Code
expectedStatusCode int
}

expectedReceivedBatches := 2
expectedIngestionBlockedRPCs := 1
expectedIngestionBlockedRPCs := 2
ingestionStates := []ingestionStateTest{
{
okToIngest: true,
expectedCode: codes.OK,
},
{
okToIngest: false,
expectedCode: codes.Unavailable,
okToIngest: false,
err: consumererror.NewPermanent(errors.New("consumer error")),
expectedCode: codes.Internal,
expectedStatusCode: http.StatusInternalServerError,
},
{
okToIngest: false,
err: errors.New("consumer error"),
expectedCode: codes.Unavailable,
expectedStatusCode: http.StatusServiceUnavailable,
},
{
okToIngest: true,
Expand All @@ -599,7 +651,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
if ingestionState.okToIngest {
sink.SetConsumeError(nil)
} else {
sink.SetConsumeError(errors.New("consumer error"))
sink.SetConsumeError(ingestionState.err)
}

pbMarshaler := ptrace.ProtoMarshaler{}
Expand All @@ -620,7 +672,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
} else {
errStatus := &spb.Status{}
assert.NoError(t, proto.Unmarshal(respBytes, errStatus))
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.Equal(t, ingestionState.expectedStatusCode, resp.StatusCode)
assert.Equal(t, ingestionState.expectedCode, codes.Code(errStatus.Code))
}
}
Expand Down Expand Up @@ -853,7 +905,7 @@ func doHTTPRequest(
encoding string,
contentType string,
data []byte,
expectErr bool,
expectStatusCode int,
) []byte {
req := createHTTPRequest(t, url, encoding, contentType, data)
resp, err := http.DefaultClient.Do(req)
Expand All @@ -866,10 +918,10 @@ func doHTTPRequest(
// For cases like "application/json; charset=utf-8", the response will be only "application/json"
require.True(t, strings.HasPrefix(strings.ToLower(contentType), resp.Header.Get("Content-Type")))

if !expectErr {
if expectStatusCode == 0 {
require.Equal(t, http.StatusOK, resp.StatusCode)
} else {
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
require.Equal(t, expectStatusCode, resp.StatusCode)
}

return respBytes
Expand Down
7 changes: 4 additions & 3 deletions receiver/otlpreceiver/otlphttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace"
Expand Down Expand Up @@ -42,7 +43,7 @@ func handleTraces(resp http.ResponseWriter, req *http.Request, tracesReceiver *t

otlpResp, err := tracesReceiver.Export(req.Context(), otlpReq)
if err != nil {
writeError(resp, enc, err, http.StatusInternalServerError)
writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err))
return
}

Expand Down Expand Up @@ -73,7 +74,7 @@ func handleMetrics(resp http.ResponseWriter, req *http.Request, metricsReceiver

otlpResp, err := metricsReceiver.Export(req.Context(), otlpReq)
if err != nil {
writeError(resp, enc, err, http.StatusInternalServerError)
writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err))
return
}

Expand Down Expand Up @@ -104,7 +105,7 @@ func handleLogs(resp http.ResponseWriter, req *http.Request, logsReceiver *logs.

otlpResp, err := logsReceiver.Export(req.Context(), otlpReq)
if err != nil {
writeError(resp, enc, err, http.StatusInternalServerError)
writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err))
return
}

Expand Down

0 comments on commit 2b0decf

Please sign in to comment.