diff --git a/.chloggen/otlphttp-partial-success.yaml b/.chloggen/otlphttp-partial-success.yaml new file mode 100755 index 00000000000..78635f3b421 --- /dev/null +++ b/.chloggen/otlphttp-partial-success.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otlphttpexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Treat partial success responses as errors + +# One or more tracking issues or pull requests related to the change +issues: [6686] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 4a5380e0c4d..655ab7fb3e0 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -101,7 +101,7 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { return consumererror.NewPermanent(err) } - return e.export(ctx, e.tracesURL, request) + return e.export(ctx, e.tracesURL, request, tracesPartialSuccessHandler) } func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { @@ -110,7 +110,7 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro if err != nil { return consumererror.NewPermanent(err) } - return e.export(ctx, e.metricsURL, request) + return e.export(ctx, e.metricsURL, request, metricsPartialSuccessHandler) } func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { @@ -120,10 +120,10 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { return consumererror.NewPermanent(err) } - return e.export(ctx, e.logsURL, request) + return e.export(ctx, e.logsURL, request, logsPartialSuccessHandler) } -func (e *baseExporter) export(ctx context.Context, url string, request []byte) error { +func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error { e.logger.Debug("Preparing to make HTTP request", zap.String("url", url)) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request)) if err != nil { @@ -144,6 +144,10 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte) e }() if resp.StatusCode >= 200 && resp.StatusCode <= 299 { + if err := handlePartialSuccessResponse(resp, partialSuccessHandler); err != nil { + return err + } + // Request is successful. return nil } @@ -225,3 +229,81 @@ func readResponse(resp *http.Response) *status.Status { return respStatus } + +func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler partialSuccessHandler) error { + if resp.ContentLength == 0 { + return nil + } + + maxRead := resp.ContentLength + needsResize := false + if maxRead == -1 || maxRead > maxHTTPResponseReadBytes { + maxRead = maxHTTPResponseReadBytes + needsResize = true + } + protoBytes := make([]byte, maxRead) + n, err := io.ReadFull(resp.Body, protoBytes) + + // No bytes read and an EOF error indicates there is no body to read. + if n == 0 && (err == nil || errors.Is(err, io.EOF)) { + return nil + } + + // io.ReadFull will return io.ErrorUnexpectedEOF if the Content-Length header + // wasn't set, since we will try to read past the length of the body. If this + // is the case, the body will still have the full message in it, so we want to + // ignore the error and parse the message. + if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { + return err + } + + // The pdata unmarshaling methods check for the length of the slice + // when unmarshaling it, so we have to trim down the length to the + // actual size of the data. + if needsResize { + protoBytes = protoBytes[:n] + } + + return partialSuccessHandler(protoBytes) +} + +type partialSuccessHandler func(protoBytes []byte) error + +func tracesPartialSuccessHandler(protoBytes []byte) error { + exportResponse := ptraceotlp.NewExportResponse() + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return err + } + partialSuccess := exportResponse.PartialSuccess() + if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) { + return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedSpans())) + } + return nil +} + +func metricsPartialSuccessHandler(protoBytes []byte) error { + exportResponse := pmetricotlp.NewExportResponse() + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return err + } + partialSuccess := exportResponse.PartialSuccess() + if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) { + return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedDataPoints())) + } + return nil +} + +func logsPartialSuccessHandler(protoBytes []byte) error { + exportResponse := plogotlp.NewExportResponse() + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return err + } + partialSuccess := exportResponse.PartialSuccess() + if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { + return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedLogRecords())) + } + return nil +} diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index df245b1304f..ad5fb7a18af 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -48,7 +48,9 @@ import ( "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/internal/testutil" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/otlpreceiver" @@ -486,8 +488,7 @@ func TestErrorResponses(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + srv, err := createBackend(addr, "/v1/traces", func(writer http.ResponseWriter, request *http.Request) { for k, v := range test.headers { writer.Header().Add(k, v) } @@ -499,15 +500,8 @@ func TestErrorResponses(t *testing.T) { require.NoError(t, err) } }) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) require.NoError(t, err) - go func() { - _ = srv.Serve(ln) - }() + defer srv.Close() cfg := &Config{ TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr), @@ -534,8 +528,6 @@ func TestErrorResponses(t *testing.T) { } else { assert.EqualValues(t, test.err, err) } - - srv.Close() }) } } @@ -570,20 +562,12 @@ func TestUserAgent(t *testing.T) { t.Run("traces", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + srv, err := createBackend(addr, "/v1/traces", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) require.NoError(t, err) - go func() { - _ = srv.Serve(ln) - }() + defer srv.Close() cfg := &Config{ TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr), @@ -605,8 +589,6 @@ func TestUserAgent(t *testing.T) { traces := ptrace.NewTraces() err = exp.ConsumeTraces(context.Background(), traces) require.NoError(t, err) - - srv.Close() }) } }) @@ -614,20 +596,12 @@ func TestUserAgent(t *testing.T) { t.Run("metrics", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + srv, err := createBackend(addr, "/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) require.NoError(t, err) - go func() { - _ = srv.Serve(ln) - }() + defer srv.Close() cfg := &Config{ MetricsEndpoint: fmt.Sprintf("http://%s/v1/metrics", addr), @@ -649,8 +623,6 @@ func TestUserAgent(t *testing.T) { metrics := pmetric.NewMetrics() err = exp.ConsumeMetrics(context.Background(), metrics) require.NoError(t, err) - - srv.Close() }) } }) @@ -658,20 +630,12 @@ func TestUserAgent(t *testing.T) { t.Run("logs", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + srv, err := createBackend(addr, "/v1/logs", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) require.NoError(t, err) - go func() { - _ = srv.Serve(ln) - }() + defer srv.Close() cfg := &Config{ LogsEndpoint: fmt.Sprintf("http://%s/v1/logs", addr), @@ -699,3 +663,231 @@ func TestUserAgent(t *testing.T) { } }) } + +func TestPartialSuccess(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + set := exportertest.NewNopCreateSettings() + + t.Run("traces", func(t *testing.T) { + srv, err := createBackend(addr, "/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + bytes, err := response.MarshalProto() + require.NoError(t, err) + _, err = writer.Write(bytes) + require.NoError(t, err) + }) + require.NoError(t, err) + defer srv.Close() + + cfg := &Config{ + TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr), + HTTPClientSettings: confighttp.HTTPClientSettings{}, + } + exp, err := createTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + traces := ptrace.NewTraces() + err = exp.ConsumeTraces(context.Background(), traces) + require.Error(t, err) + }) + + t.Run("metrics", func(t *testing.T) { + srv, err := createBackend(addr, "/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + response := pmetricotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedDataPoints(1) + bytes, err := response.MarshalProto() + require.NoError(t, err) + _, err = writer.Write(bytes) + require.NoError(t, err) + }) + require.NoError(t, err) + defer srv.Close() + + cfg := &Config{ + MetricsEndpoint: fmt.Sprintf("http://%s/v1/metrics", addr), + HTTPClientSettings: confighttp.HTTPClientSettings{}, + } + exp, err := createMetricsExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + metrics := pmetric.NewMetrics() + err = exp.ConsumeMetrics(context.Background(), metrics) + require.Error(t, err) + }) + + t.Run("logs", func(t *testing.T) { + srv, err := createBackend(addr, "/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + response := plogotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedLogRecords(1) + bytes, err := response.MarshalProto() + require.NoError(t, err) + _, err = writer.Write(bytes) + require.NoError(t, err) + }) + require.NoError(t, err) + defer srv.Close() + + cfg := &Config{ + LogsEndpoint: fmt.Sprintf("http://%s/v1/logs", addr), + HTTPClientSettings: confighttp.HTTPClientSettings{}, + } + exp, err := createLogsExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + logs := plog.NewLogs() + err = exp.ConsumeLogs(context.Background(), logs) + require.Error(t, err) + }) + + t.Run("Response is missing a Content-Length header but includes a partial success object", func(t *testing.T) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + data, err := response.MarshalProto() + require.NoError(t, err) + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(bytes.NewReader(data)), + } + err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.True(t, consumererror.IsPermanent(err)) + }) + + t.Run("Response is missing a Content-Length header and a body", func(t *testing.T) { + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(bytes.NewReader([]byte{})), + } + err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Nil(t, err) + }) + + t.Run("Reading the response body returns an error other than ErrUnexpectedEOF", func(t *testing.T) { + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(badReader{}), + } + err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) + }) + + t.Run("Response has short Content-Length header", func(t *testing.T) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + data, err := response.MarshalProto() + require.NoError(t, err) + resp := &http.Response{ + ContentLength: 3, + Body: io.NopCloser(bytes.NewReader(data)), + } + err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) + }) + + t.Run("Response has long Content-Length header", func(t *testing.T) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + data, err := response.MarshalProto() + require.NoError(t, err) + resp := &http.Response{ + ContentLength: 4096, + Body: io.NopCloser(bytes.NewReader(data)), + } + err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) + }) + + invalidBodyCases := []struct { + telemetryType string + handler partialSuccessHandler + }{ + { + telemetryType: "traces", + handler: tracesPartialSuccessHandler, + }, + { + telemetryType: "metrics", + handler: metricsPartialSuccessHandler, + }, + { + telemetryType: "logs", + handler: logsPartialSuccessHandler, + }, + } + for _, tt := range invalidBodyCases { + t.Run("Invalid response body: "+tt.telemetryType, func(t *testing.T) { + str := "invalid proto" + body := bytes.NewBufferString(str) + resp := &http.Response{ + ContentLength: int64(len(str)), + Body: io.NopCloser(body), + } + err := handlePartialSuccessResponse(resp, tt.handler) + assert.Error(t, err) + }) + } + +} + +func createBackend(addr string, endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) (*http.Server, error) { + mux := http.NewServeMux() + mux.HandleFunc(endpoint, handler) + srv := http.Server{ + Addr: addr, + Handler: mux, + } + ln, err := net.Listen("tcp", addr) + if err != nil { + return &http.Server{}, err + } + go func() { + _ = srv.Serve(ln) + }() + + return &srv, nil +} + +type badReader struct{} + +func (b badReader) Read([]byte) (int, error) { + return 0, errors.New("Bad read") +}