Skip to content

Commit

Permalink
add support for additional decoders
Browse files Browse the repository at this point in the history
This provides the ability to add support for compression types which are
not supported in core, to be supported by individual components.

Signed-off-by: Alex Boten <aboten@lightstep.com>
  • Loading branch information
Alex Boten committed Jun 27, 2023
1 parent 8278824 commit 5139a96
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 36 deletions.
16 changes: 16 additions & 0 deletions .chloggen/codeboten_add-option-for-decompressor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confighttp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add support for additional content decoders via `WithDecoder` server option"

# One or more tracking issues or pull requests related to the change
issues: [7977]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
85 changes: 51 additions & 34 deletions config/confighttp/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,68 @@ func (r *compressRoundTripper) RoundTrip(req *http.Request) (*http.Response, err
type decompressor struct {
errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)
base http.Handler
decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)
}

// httpContentDecompressor offloads the task of handling compressed HTTP requests
// by identifying the compression format in the "Content-Encoding" header and re-writing
// request body so that the handlers further in the chain can work on decompressed data.
// It supports gzip and deflate/zlib compression.
func httpContentDecompressor(h http.Handler, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)) http.Handler {
func httpContentDecompressor(h http.Handler, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
errHandler := defaultErrorHandler
if eh != nil {
errHandler = eh
}
return &decompressor{

d := &decompressor{
errHandler: errHandler,
base: h,
decoders: map[string]func(body io.ReadCloser) (io.ReadCloser, error){
"": func(body io.ReadCloser) (io.ReadCloser, error) {
// Not a compressed payload. Nothing to do.
return nil, nil
},
"gzip": func(body io.ReadCloser) (io.ReadCloser, error) {
gr, err := gzip.NewReader(body)
if err != nil {
return nil, err
}
return gr, nil
},
"zstd": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zstd.NewReader(
body,
// Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
// for our use-case (a server accepting decoding http requests).
// Disabling async improves performance (I benchmarked it previously when working
// on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
zstd.WithDecoderConcurrency(1),
)
if err != nil {
return nil, err
}
return zr.IOReadCloser(), nil
},
"zlib": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zlib.NewReader(body)
if err != nil {
return nil, err
}
return zr, nil
},
},
}
d.decoders["deflate"] = d.decoders["zlib"]

for key, dec := range decoders {
d.decoders[key] = dec
}

return d
}

func (d *decompressor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
newBody, err := newBodyReader(r)
newBody, err := d.newBodyReader(r)
if err != nil {
d.errHandler(w, r, err.Error(), http.StatusBadRequest)
return
Expand All @@ -104,39 +147,13 @@ func (d *decompressor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
d.base.ServeHTTP(w, r)
}

func newBodyReader(r *http.Request) (io.ReadCloser, error) {
func (d *decompressor) newBodyReader(r *http.Request) (io.ReadCloser, error) {
encoding := r.Header.Get(headerContentEncoding)
switch encoding {
case string(configcompression.Gzip):
gr, err := gzip.NewReader(r.Body)
if err != nil {
return nil, err
}
return gr, nil
case string(configcompression.Deflate), string(configcompression.Zlib):
zr, err := zlib.NewReader(r.Body)
if err != nil {
return nil, err
}
return zr, nil
case "zstd":
zr, err := zstd.NewReader(
r.Body,
// Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
// for our use-case (a server accepting decoding http requests).
// Disabling async improves performance (I benchmarked it previously when working
// on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
zstd.WithDecoderConcurrency(1),
)
if err != nil {
return nil, err
}
return zr.IOReadCloser(), nil
case "":
// Not a compressed payload. Nothing to do.
return nil, nil
decoder, ok := d.decoders[encoding]
if !ok {
return nil, fmt.Errorf("unsupported %s: %s", headerContentEncoding, encoding)
}
return nil, fmt.Errorf("unsupported %s: %s", headerContentEncoding, encoding)
return decoder(r.Body)
}

// defaultErrorHandler writes the error message in plain text.
Expand Down
52 changes: 51 additions & 1 deletion config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/golang/snappy"
Expand Down Expand Up @@ -114,8 +115,44 @@ func TestHTTPClientCompression(t *testing.T) {
}
}

func TestHTTPCustomDecompression(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}

require.NoError(t, err, "failed to read request body: %v", err)
assert.EqualValues(t, "decompressed body", string(body))
w.WriteHeader(http.StatusOK)
})
decoders := map[string]func(body io.ReadCloser) (io.ReadCloser, error){
"custom-encoding": func(body io.ReadCloser) (io.ReadCloser, error) {
return io.NopCloser(strings.NewReader("decompressed body")), nil
},
}
srv := httptest.NewServer(httpContentDecompressor(handler, defaultErrorHandler, decoders))

t.Cleanup(srv.Close)

req, err := http.NewRequest(http.MethodGet, srv.URL, bytes.NewBuffer([]byte("123deompressed body")))
require.NoError(t, err, "failed to create request to test handler")
req.Header.Set("Content-Encoding", "custom-encoding")

client := http.Client{}
res, err := client.Do(req)
require.NoError(t, err)

assert.Equal(t, http.StatusOK, res.StatusCode, "test handler returned unexpected status code ")
_, err = io.ReadAll(res.Body)
require.NoError(t, res.Body.Close(), "failed to close request body: %v", err)
}

func TestHTTPContentDecompressionHandler(t *testing.T) {
testBody := []byte("uncompressed_text")
noDecoders := map[string]func(io.ReadCloser) (io.ReadCloser, error){}
tests := []struct {
name string
encoding string
Expand All @@ -129,6 +166,12 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
reqBody: bytes.NewBuffer(testBody),
respCode: http.StatusOK,
},
{
name: "ValidDeflate",
encoding: "deflate",
reqBody: compressZlib(t, testBody),
respCode: http.StatusOK,
},
{
name: "ValidGzip",
encoding: "gzip",
Expand All @@ -147,6 +190,13 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
reqBody: compressZstd(t, testBody),
respCode: http.StatusOK,
},
{
name: "InvalidDeflate",
encoding: "deflate",
reqBody: bytes.NewBuffer(testBody),
respCode: http.StatusBadRequest,
respBody: "zlib: invalid header\n",
},
{
name: "InvalidGzip",
encoding: "gzip",
Expand Down Expand Up @@ -189,7 +239,7 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
require.NoError(t, err, "failed to read request body: %v", err)
assert.EqualValues(t, testBody, string(body))
w.WriteHeader(http.StatusOK)
}), defaultErrorHandler))
}), defaultErrorHandler, noDecoders))
t.Cleanup(srv.Close)

req, err := http.NewRequest(http.MethodGet, srv.URL, tt.reqBody)
Expand Down
12 changes: 11 additions & 1 deletion config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package confighttp // import "go.opentelemetry.io/collector/config/confighttp"
import (
"crypto/tls"
"errors"
"io"
"net"
"net/http"
"time"
Expand Down Expand Up @@ -248,6 +249,7 @@ func (hss *HTTPServerSettings) ToListener() (net.Listener, error) {
// returned by HTTPServerSettings.ToServer().
type toServerOptions struct {
errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)
decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)
}

// ToServerOption is an option to change the behavior of the HTTP server
Expand All @@ -262,6 +264,14 @@ func WithErrorHandler(e func(w http.ResponseWriter, r *http.Request, errorMsg st
}
}

// WithDecoder provides support for additional decoders to be configured
// by the caller.
func WithDecoder(key string, dec func(body io.ReadCloser) (io.ReadCloser, error)) ToServerOption {
return func(opts *toServerOptions) {
opts.decoders[key] = dec
}
}

// ToServer creates an http.Server from settings object.
func (hss *HTTPServerSettings) ToServer(host component.Host, settings component.TelemetrySettings, handler http.Handler, opts ...ToServerOption) (*http.Server, error) {
internal.WarnOnUnspecifiedHost(settings.Logger, hss.Endpoint)
Expand All @@ -271,7 +281,7 @@ func (hss *HTTPServerSettings) ToServer(host component.Host, settings component.
o(serverOpts)
}

handler = httpContentDecompressor(handler, serverOpts.errHandler)
handler = httpContentDecompressor(handler, serverOpts.errHandler, serverOpts.decoders)

if hss.MaxRequestBodySize > 0 {
handler = maxRequestBodySizeInterceptor(handler, hss.MaxRequestBodySize)
Expand Down

0 comments on commit 5139a96

Please sign in to comment.