diff --git a/.chloggen/fix-compression-kinesis-exporter.yaml b/.chloggen/fix-compression-kinesis-exporter.yaml new file mode 100644 index 000000000000..635bbe115ade --- /dev/null +++ b/.chloggen/fix-compression-kinesis-exporter.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "bug_fix" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "awskinesisexporter" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Fixes the compression by closing it so that it marks the end of the compression and makes it thread safe" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [5663] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/awskinesisexporter/exporter.go b/exporter/awskinesisexporter/exporter.go index 8b5bb5a101dd..f13d82d18836 100644 --- a/exporter/awskinesisexporter/exporter.go +++ b/exporter/awskinesisexporter/exporter.go @@ -89,7 +89,7 @@ func createExporter(ctx context.Context, c component.Config, log *zap.Logger, op return nil, err } - compressor, err := compress.NewCompressor(conf.Encoding.Compression) + compressor, err := compress.NewCompressor(conf.Encoding.Compression, log) if err != nil { return nil, err } diff --git a/exporter/awskinesisexporter/internal/compress/compresser.go b/exporter/awskinesisexporter/internal/compress/compresser.go index f3060253678f..bca6a4992688 100644 --- a/exporter/awskinesisexporter/internal/compress/compresser.go +++ b/exporter/awskinesisexporter/internal/compress/compresser.go @@ -10,11 +10,14 @@ import ( "compress/zlib" "fmt" "io" + "sync" + + "go.uber.org/zap" ) type bufferedResetWriter interface { Write(p []byte) (int, error) - Flush() error + Close() error Reset(newWriter io.Writer) } @@ -25,35 +28,63 @@ type Compressor interface { var _ Compressor = (*compressor)(nil) type compressor struct { - compression bufferedResetWriter + compressionPool sync.Pool } -func NewCompressor(format string) (Compressor, error) { - c := &compressor{ - compression: &noop{}, - } +func NewCompressor(format string, log *zap.Logger) (Compressor, error) { + var c Compressor switch format { case "flate": - w, err := flate.NewWriter(nil, flate.BestSpeed) - if err != nil { - return nil, err + c = &compressor{ + compressionPool: sync.Pool{ + New: func() any { + w, err := flate.NewWriter(nil, flate.BestSpeed) + if err != nil { + errMsg := fmt.Sprintf("Unable to instantiate Flate compressor: %v", err) + log.Error(errMsg) + return nil + } + return w + }, + }, } - c.compression = w case "gzip": - w, err := gzip.NewWriterLevel(nil, gzip.BestSpeed) - if err != nil { - return nil, err - } - c.compression = w + c = &compressor{ + compressionPool: sync.Pool{ + New: func() any { + w, err := gzip.NewWriterLevel(nil, gzip.BestSpeed) + if err != nil { + errMsg := fmt.Sprintf("Unable to instantiate Gzip compressor: %v", err) + log.Error(errMsg) + return nil + } + return w + }, + }, + } case "zlib": - w, err := zlib.NewWriterLevel(nil, zlib.BestSpeed) - if err != nil { - return nil, err + c = &compressor{ + compressionPool: sync.Pool{ + New: func() any { + w, err := zlib.NewWriterLevel(nil, zlib.BestSpeed) + if err != nil { + errMsg := fmt.Sprintf("Unable to instantiate Zlib compressor: %v", err) + log.Error(errMsg) + return nil + } + return w + }, + }, } - c.compression = w case "noop", "none": - // Already the default case + c = &compressor{ + compressionPool: sync.Pool{ + New: func() any { + return &noop{} + }, + }, + } default: return nil, fmt.Errorf("unknown compression format: %s", format) } @@ -63,14 +94,19 @@ func NewCompressor(format string) (Compressor, error) { func (c *compressor) Do(in []byte) ([]byte, error) { buf := new(bytes.Buffer) + comp := c.compressionPool.Get().(bufferedResetWriter) + if comp == nil { + return nil, fmt.Errorf("compressor is nil and did not get instantiated correctly") + } + defer c.compressionPool.Put(comp) - c.compression.Reset(buf) + comp.Reset(buf) - if _, err := c.compression.Write(in); err != nil { + if _, err := comp.Write(in); err != nil { return nil, err } - if err := c.compression.Flush(); err != nil { + if err := comp.Close(); err != nil { return nil, err } diff --git a/exporter/awskinesisexporter/internal/compress/compresser_test.go b/exporter/awskinesisexporter/internal/compress/compresser_test.go index 93075b845420..95d26cf7bc36 100644 --- a/exporter/awskinesisexporter/internal/compress/compresser_test.go +++ b/exporter/awskinesisexporter/internal/compress/compresser_test.go @@ -4,43 +4,107 @@ package compress_test import ( + "bytes" + "compress/flate" + "compress/gzip" + "compress/zlib" + "errors" "fmt" + "io" "math/rand" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress" ) +func GzipDecompress(data []byte) ([]byte, error) { + buf := bytes.NewBuffer(data) + + zr, err := gzip.NewReader(buf) + if err != nil { + return nil, err + } + + out := bytes.Buffer{} + if _, err = io.CopyN(&out, zr, 1024); err != nil && !errors.Is(err, io.EOF) { + zr.Close() + return nil, err + } + zr.Close() + return out.Bytes(), nil +} + +func NoopDecompress(data []byte) ([]byte, error) { + return data, nil +} + +func ZlibDecompress(data []byte) ([]byte, error) { + buf := bytes.NewBuffer(data) + + zr, err := zlib.NewReader(buf) + if err != nil { + return nil, err + } + + out := bytes.Buffer{} + if _, err = io.CopyN(&out, zr, 1024); err != nil && !errors.Is(err, io.EOF) { + zr.Close() + return nil, err + } + zr.Close() + return out.Bytes(), nil +} + +func FlateDecompress(data []byte) ([]byte, error) { + var err error + buf := bytes.NewBuffer(data) + zr := flate.NewReader(buf) + out := bytes.Buffer{} + if _, err = io.CopyN(&out, zr, 1024); err != nil && !errors.Is(err, io.EOF) { + zr.Close() + return nil, err + } + zr.Close() + return out.Bytes(), nil +} + func TestCompressorFormats(t *testing.T) { t.Parallel() testCases := []struct { - format string + format string + decompress func(data []byte) ([]byte, error) }{ - {format: "none"}, - {format: "noop"}, - {format: "gzip"}, - {format: "zlib"}, - {format: "flate"}, + {format: "none", decompress: NoopDecompress}, + {format: "noop", decompress: NoopDecompress}, + {format: "gzip", decompress: GzipDecompress}, + {format: "zlib", decompress: ZlibDecompress}, + {format: "flate", decompress: FlateDecompress}, } const data = "You know nothing Jon Snow" + for _, tc := range testCases { t.Run(fmt.Sprintf("format_%s", tc.format), func(t *testing.T) { - c, err := compress.NewCompressor(tc.format) + logger := zaptest.NewLogger(t) + c, err := compress.NewCompressor(tc.format, logger) require.NoError(t, err, "Must have a valid compression format") require.NotNil(t, c, "Must have a valid compressor") out, err := c.Do([]byte(data)) assert.NoError(t, err, "Must not error when processing data") assert.NotNil(t, out, "Must have a valid record") + outDecompress, err := tc.decompress(out) + assert.NoError(t, err, "Decompression must have no errors") + assert.Equal(t, []byte(data), outDecompress, "Data input should be the same after compression and decompression") }) } - _, err := compress.NewCompressor("invalid-format") + _, err := compress.NewCompressor("invalid-format", zaptest.NewLogger(t)) assert.Error(t, err, "Must error when an invalid compression format is given") } @@ -82,7 +146,7 @@ func benchmarkCompressor(b *testing.B, format string, length int) { source := rand.NewSource(time.Now().UnixMilli()) genRand := rand.New(source) - compressor, err := compress.NewCompressor(format) + compressor, err := compress.NewCompressor(format, zaptest.NewLogger(b)) require.NoError(b, err, "Must not error when given a valid format") require.NotNil(b, compressor, "Must have a valid compressor") diff --git a/exporter/awskinesisexporter/internal/compress/noop_compression.go b/exporter/awskinesisexporter/internal/compress/noop_compression.go index 0c8abf17b329..8024239242fd 100644 --- a/exporter/awskinesisexporter/internal/compress/noop_compression.go +++ b/exporter/awskinesisexporter/internal/compress/noop_compression.go @@ -3,7 +3,10 @@ package compress // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress" -import "io" +import ( + "io" + "sync" +) type noop struct { data io.Writer @@ -11,7 +14,11 @@ type noop struct { func NewNoopCompressor() Compressor { return &compressor{ - compression: &noop{}, + compressionPool: sync.Pool{ + New: func() any { + return &noop{} + }, + }, } } @@ -23,6 +30,6 @@ func (n noop) Write(p []byte) (int, error) { return n.data.Write(p) } -func (n noop) Flush() error { +func (n noop) Close() error { return nil }