Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zstd decompression support to HTTPServerSettings #7927

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add zstd decompression support to HTTPServerSettings
This adds ability to decompress zstd-compressed HTTP requests to
all receivers that use HTTPServerSettings.

Also added missing error handling for the case when an unsupported
compression type was used in the request. Now it correctly returns
400 Bad Request. Also added a unit test to verify this case.

Once this is merged I will submit a PR in contrib repo to add end-to-end
tests that use zstd compression in the testbed.
  • Loading branch information
tigrannajaryan committed Jun 20, 2023
commit f901fda92263825767bf64e4fb9eaadc7071d875
17 changes: 17 additions & 0 deletions .chloggen/add-zstd-decompression.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: HTTPServerSettings

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add zstd support to HTTPServerSettings

# One or more tracking issues or pull requests related to the change
issues: [7927]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This adds ability to decompress zstd-compressed HTTP requests to|
all receivers that use HTTPServerSettings.
16 changes: 16 additions & 0 deletions .chloggen/fix-unsupported-content-encoding-bug.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: HTTPServerSettings

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Ensure requests with unsupported Content-Encoding return HTTP 400 Bad Request

# One or more tracking issues or pull requests related to the change
issues: [7927]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
26 changes: 23 additions & 3 deletions config/confighttp/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"bytes"
"compress/gzip"
"compress/zlib"
"fmt"
"io"
"net/http"

"github.com/klauspost/compress/zstd"

"go.opentelemetry.io/collector/config/configcompression"
)

Expand Down Expand Up @@ -102,7 +105,7 @@ func (d *decompressor) wrap(h http.Handler) http.Handler {
defer newBody.Close()
// "Content-Encoding" header is removed to avoid decompressing twice
// in case the next handler(s) have implemented a similar mechanism.
r.Header.Del("Content-Encoding")
r.Header.Del(headerContentEncoding)
// "Content-Length" is set to -1 as the size of the decompressed body is unknown.
r.Header.Del("Content-Length")
r.ContentLength = -1
Expand All @@ -113,7 +116,8 @@ func (d *decompressor) wrap(h http.Handler) http.Handler {
}

func newBodyReader(r *http.Request) (io.ReadCloser, error) {
switch r.Header.Get("Content-Encoding") {
encoding := r.Header.Get(headerContentEncoding)
switch encoding {
case "gzip":
gr, err := gzip.NewReader(r.Body)
if err != nil {
Expand All @@ -126,8 +130,24 @@ func newBodyReader(r *http.Request) (io.ReadCloser, error) {
return nil, err
}
return zr, nil
case "zstd":
zr, err := zstd.NewReader(
r.Body,
// Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
// for our use-case (a server accepting decoding http requests).
// Disabling async improves performance (I benchmarked it previously when working
// on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
zstd.WithDecoderConcurrency(1),
)
if err != nil {
return nil, err
}
return io.NopCloser(zr), nil
case "":
// Not a compressed payload. Nothing to do.
return nil, nil
}
return nil, nil
return nil, fmt.Errorf("unsupported %s: %s", headerContentEncoding, encoding)
}

// defaultErrorHandler writes the error message in plain text.
Expand Down
28 changes: 27 additions & 1 deletion config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
reqBody: compressZlib(t, testBody),
respCode: 200,
},
{
name: "ValidZstd",
encoding: "zstd",
reqBody: compressZstd(t, testBody),
respCode: 200,
},
{
name: "InvalidGzip",
encoding: "gzip",
Expand All @@ -155,14 +161,34 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
respCode: 400,
respBody: "zlib: invalid header\n",
},
{
name: "InvalidZstd",
encoding: "zstd",
reqBody: bytes.NewBuffer(testBody),
respCode: 400,
respBody: "invalid input: magic number mismatch",
},
{
name: "UnsupportedCompression",
encoding: "nosuchcompression",
reqBody: bytes.NewBuffer(testBody),
respCode: 400,
respBody: "unsupported Content-Encoding: nosuchcompression\n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(httpContentDecompressor(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}

require.NoError(t, err, "failed to read request body: %v", err)
assert.EqualValues(t, testBody, string(body))
w.WriteHeader(200)
w.WriteHeader(http.StatusOK)
})))
t.Cleanup(srv.Close)

Expand Down
54 changes: 52 additions & 2 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"time"

"github.com/klauspost/compress/zstd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
spb "google.golang.org/genproto/googleapis/rpc/status"
Expand Down Expand Up @@ -153,6 +154,11 @@ func TestJsonHttp(t *testing.T) {
encoding: "gzip",
contentType: "application/json",
},
{
name: "JSONZstdCompressed",
encoding: "zstd",
contentType: "application/json",
},
{
name: "NotGRPCError",
encoding: "",
Expand Down Expand Up @@ -374,8 +380,13 @@ func testHTTPJSONRequest(t *testing.T, url string, sink *errOrSinkConsumer, enco
case "gzip":
buf, err = compressGzip(traceJSON)
require.NoError(t, err, "Error while gzip compressing trace: %v", err)
default:
case "zstd":
buf, err = compressZstd(traceJSON)
require.NoError(t, err, "Error while zstd compressing trace: %v", err)
case "":
buf = bytes.NewBuffer(traceJSON)
default:
t.Fatalf("Unsupported compression type %v", encoding)
}
sink.SetConsumeError(expectedErr)
req, err := http.NewRequest(http.MethodPost, url, buf)
Expand Down Expand Up @@ -427,6 +438,10 @@ func TestProtoHttp(t *testing.T) {
name: "ProtoGzipCompressed",
encoding: "gzip",
},
{
name: "ProtoZstdCompressed",
encoding: "zstd",
},
{
name: "NotGRPCError",
encoding: "",
Expand Down Expand Up @@ -477,8 +492,13 @@ func createHTTPProtobufRequest(
case "gzip":
buf, err = compressGzip(traceBytes)
require.NoError(t, err, "Error while gzip compressing trace: %v", err)
default:
case "zstd":
buf, err = compressZstd(traceBytes)
require.NoError(t, err, "Error while zstd compressing trace: %v", err)
case "":
buf = bytes.NewBuffer(traceBytes)
default:
t.Fatalf("Unsupported compression type %v", encoding)
}
req, err := http.NewRequest(http.MethodPost, url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
Expand Down Expand Up @@ -567,6 +587,18 @@ func TestOTLPReceiverInvalidContentEncoding(t *testing.T) {
},
status: 400,
},
{
name: "ProtoZstdUncompressed",
content: "application/x-protobuf",
encoding: "zstd",
reqBodyFunc: func() (*bytes.Buffer, error) {
return bytes.NewBuffer([]byte(`{"key": "value"}`)), nil
},
resBodyFunc: func() ([]byte, error) {
return proto.Marshal(status.New(codes.InvalidArgument, "invalid input: magic number mismatch").Proto())
},
status: 400,
},
}
addr := testutil.GetAvailableLocalAddress(t)

Expand Down Expand Up @@ -969,6 +1001,24 @@ func compressGzip(body []byte) (*bytes.Buffer, error) {
return &buf, nil
}

func compressZstd(body []byte) (*bytes.Buffer, error) {
var buf bytes.Buffer

zw, err := zstd.NewWriter(&buf)
if err != nil {
return nil, err
}

defer zw.Close()

_, err = zw.Write(body)
if err != nil {
return nil, err
}

return &buf, nil
}

type senderFunc func(td ptrace.Traces)

func TestShutdown(t *testing.T) {
Expand Down