From 10e89bd77ab9e09b75bd6f440c925d3293e13233 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Wed, 5 Jun 2024 13:49:59 +0000 Subject: [PATCH] [configgrpc] Use own compressors for zstd (#10323) (#10324) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backport of #10323 Signed-off-by: Juraci Paixão Kröhling Co-authored-by: Juraci Paixão Kröhling --- ...nfiggrpc-use-own-compressors-for-zstd.yaml | 13 +++ config/configgrpc/configgrpc.go | 4 +- .../configgrpc/configgrpc_benchmark_test.go | 4 +- config/configgrpc/go.mod | 2 +- config/configgrpc/internal/zstd.go | 83 +++++++++++++++++++ config/configgrpc/internal/zstd_test.go | 41 +++++++++ receiver/otlpreceiver/otlp_test.go | 3 +- 7 files changed, 144 insertions(+), 6 deletions(-) create mode 100644 .chloggen/jpkroehling-configgrpc-use-own-compressors-for-zstd.yaml create mode 100644 config/configgrpc/internal/zstd.go create mode 100644 config/configgrpc/internal/zstd_test.go diff --git a/.chloggen/jpkroehling-configgrpc-use-own-compressors-for-zstd.yaml b/.chloggen/jpkroehling-configgrpc-use-own-compressors-for-zstd.yaml new file mode 100644 index 00000000000..a04c4f89012 --- /dev/null +++ b/.chloggen/jpkroehling-configgrpc-use-own-compressors-for-zstd.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'bug_fix' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: Use own compressors for zstd + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Before this change, the zstd compressor we used didn't respect the max message size. + +# One or more tracking issues or pull requests related to the change +issues: [10323] diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 98d428857ce..b57a199461c 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -12,7 +12,6 @@ import ( "time" "github.com/mostynb/go-grpc-compression/nonclobbering/snappy" - "github.com/mostynb/go-grpc-compression/nonclobbering/zstd" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel" "google.golang.org/grpc" @@ -28,6 +27,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configauth" "go.opentelemetry.io/collector/config/configcompression" + grpcInternal "go.opentelemetry.io/collector/config/configgrpc/internal" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtelemetry" @@ -426,7 +426,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err case configcompression.TypeSnappy: return snappy.Name, nil case configcompression.TypeZstd: - return zstd.Name, nil + return grpcInternal.ZstdName, nil default: return "", fmt.Errorf("unsupported compression type %q", compressionType) } diff --git a/config/configgrpc/configgrpc_benchmark_test.go b/config/configgrpc/configgrpc_benchmark_test.go index 1ad755f2b4f..3254655e9ec 100644 --- a/config/configgrpc/configgrpc_benchmark_test.go +++ b/config/configgrpc/configgrpc_benchmark_test.go @@ -10,12 +10,12 @@ import ( "testing" "github.com/mostynb/go-grpc-compression/nonclobbering/snappy" - "github.com/mostynb/go-grpc-compression/nonclobbering/zstd" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" + "go.opentelemetry.io/collector/config/configgrpc/internal" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -27,7 +27,7 @@ func BenchmarkCompressors(b *testing.B) { compressors := make([]encoding.Compressor, 0) compressors = append(compressors, encoding.GetCompressor(gzip.Name)) - compressors = append(compressors, encoding.GetCompressor(zstd.Name)) + compressors = append(compressors, encoding.GetCompressor(internal.ZstdName)) compressors = append(compressors, encoding.GetCompressor(snappy.Name)) for _, payload := range payloads { diff --git a/config/configgrpc/go.mod b/config/configgrpc/go.mod index cca50c4ffc5..960a0d96376 100644 --- a/config/configgrpc/go.mod +++ b/config/configgrpc/go.mod @@ -3,6 +3,7 @@ module go.opentelemetry.io/collector/config/configgrpc go 1.21.0 require ( + github.com/klauspost/compress v1.17.2 github.com/mostynb/go-grpc-compression v1.2.2 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.102.0 @@ -36,7 +37,6 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.17.2 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.1 // indirect diff --git a/config/configgrpc/internal/zstd.go b/config/configgrpc/internal/zstd.go new file mode 100644 index 00000000000..0718b73535f --- /dev/null +++ b/config/configgrpc/internal/zstd.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// Copyright 2017 gRPC authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/config/configgrpc/internal" + +import ( + "errors" + "io" + "sync" + + "github.com/klauspost/compress/zstd" + "google.golang.org/grpc/encoding" +) + +const ZstdName = "zstd" + +func init() { + encoding.RegisterCompressor(NewZstdCodec()) +} + +type writer struct { + *zstd.Encoder + pool *sync.Pool +} + +func NewZstdCodec() encoding.Compressor { + c := &compressor{} + c.poolCompressor.New = func() any { + zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), zstd.WithWindowSize(512*1024)) + return &writer{Encoder: zw, pool: &c.poolCompressor} + } + return c +} + +func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { + z := c.poolCompressor.Get().(*writer) + z.Encoder.Reset(w) + return z, nil +} + +func (z *writer) Close() error { + defer z.pool.Put(z) + return z.Encoder.Close() +} + +type reader struct { + *zstd.Decoder + pool *sync.Pool +} + +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + z, inPool := c.poolDecompressor.Get().(*reader) + if !inPool { + newZ, err := zstd.NewReader(r) + if err != nil { + return nil, err + } + return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil + } + if err := z.Reset(r); err != nil { + c.poolDecompressor.Put(z) + return nil, err + } + return z, nil +} + +func (z *reader) Read(p []byte) (n int, err error) { + n, err = z.Decoder.Read(p) + if errors.Is(err, io.EOF) { + z.pool.Put(z) + } + return n, err +} + +func (c *compressor) Name() string { + return ZstdName +} + +type compressor struct { + poolCompressor sync.Pool + poolDecompressor sync.Pool +} diff --git a/config/configgrpc/internal/zstd_test.go b/config/configgrpc/internal/zstd_test.go new file mode 100644 index 00000000000..e16336c8ccb --- /dev/null +++ b/config/configgrpc/internal/zstd_test.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_zstdCodec_CompressDecompress(t *testing.T) { + // prepare + msg := []byte("Hello world.") + compressed := &bytes.Buffer{} + + // zstd header, for sanity checking + header := []byte{40, 181, 47, 253} + + c := NewZstdCodec() + cWriter, err := c.Compress(compressed) + require.NoError(t, err) + require.NotNil(t, cWriter) + + _, err = cWriter.Write(msg) + require.NoError(t, err) + cWriter.Close() + + cReader, err := c.Decompress(compressed) + require.NoError(t, err) + require.NotNil(t, cReader) + + uncompressed, err := io.ReadAll(cReader) + require.NoError(t, err) + require.Equal(t, msg, uncompressed) + + // test header + require.Equal(t, header, compressed.Bytes()[:4]) +} diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 51f99bb4cb8..14eecbcdac1 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -726,7 +726,8 @@ func TestGRPCMaxRecvSize(t *testing.T) { require.NoError(t, err) td := testdata.GenerateTraces(50000) - require.Error(t, exportTraces(cc, td)) + err = exportTraces(cc, td) + require.Error(t, err) assert.NoError(t, cc.Close()) require.NoError(t, recv.Shutdown(context.Background()))