Skip to content

Commit

Permalink
Fix compression in the awskinesisexporter and thread safe (#5663)
Browse files Browse the repository at this point in the history
  • Loading branch information
phoebe-canva authored Aug 10, 2023
1 parent c13c645 commit e9acd9e
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 36 deletions.
20 changes: 20 additions & 0 deletions .chloggen/fix-compression-kinesis-exporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "bug_fix"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "awskinesisexporter"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Fixes the compression by closing it so that it marks the end of the compression and makes it thread safe"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [5663]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 1 addition & 1 deletion exporter/awskinesisexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func createExporter(ctx context.Context, c component.Config, log *zap.Logger, op
return nil, err
}

compressor, err := compress.NewCompressor(conf.Encoding.Compression)
compressor, err := compress.NewCompressor(conf.Encoding.Compression, log)
if err != nil {
return nil, err
}
Expand Down
82 changes: 59 additions & 23 deletions exporter/awskinesisexporter/internal/compress/compresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import (
"compress/zlib"
"fmt"
"io"
"sync"

"go.uber.org/zap"
)

type bufferedResetWriter interface {
Write(p []byte) (int, error)
Flush() error
Close() error
Reset(newWriter io.Writer)
}

Expand All @@ -25,35 +28,63 @@ type Compressor interface {
var _ Compressor = (*compressor)(nil)

type compressor struct {
compression bufferedResetWriter
compressionPool sync.Pool
}

func NewCompressor(format string) (Compressor, error) {
c := &compressor{
compression: &noop{},
}
func NewCompressor(format string, log *zap.Logger) (Compressor, error) {
var c Compressor
switch format {
case "flate":
w, err := flate.NewWriter(nil, flate.BestSpeed)
if err != nil {
return nil, err
c = &compressor{
compressionPool: sync.Pool{
New: func() any {
w, err := flate.NewWriter(nil, flate.BestSpeed)
if err != nil {
errMsg := fmt.Sprintf("Unable to instantiate Flate compressor: %v", err)
log.Error(errMsg)
return nil
}
return w
},
},
}
c.compression = w
case "gzip":
w, err := gzip.NewWriterLevel(nil, gzip.BestSpeed)
if err != nil {
return nil, err
}
c.compression = w

c = &compressor{
compressionPool: sync.Pool{
New: func() any {
w, err := gzip.NewWriterLevel(nil, gzip.BestSpeed)
if err != nil {
errMsg := fmt.Sprintf("Unable to instantiate Gzip compressor: %v", err)
log.Error(errMsg)
return nil
}
return w
},
},
}
case "zlib":
w, err := zlib.NewWriterLevel(nil, zlib.BestSpeed)
if err != nil {
return nil, err
c = &compressor{
compressionPool: sync.Pool{
New: func() any {
w, err := zlib.NewWriterLevel(nil, zlib.BestSpeed)
if err != nil {
errMsg := fmt.Sprintf("Unable to instantiate Zlib compressor: %v", err)
log.Error(errMsg)
return nil
}
return w
},
},
}
c.compression = w
case "noop", "none":
// Already the default case
c = &compressor{
compressionPool: sync.Pool{
New: func() any {
return &noop{}
},
},
}
default:
return nil, fmt.Errorf("unknown compression format: %s", format)
}
Expand All @@ -63,14 +94,19 @@ func NewCompressor(format string) (Compressor, error) {

func (c *compressor) Do(in []byte) ([]byte, error) {
buf := new(bytes.Buffer)
comp := c.compressionPool.Get().(bufferedResetWriter)
if comp == nil {
return nil, fmt.Errorf("compressor is nil and did not get instantiated correctly")
}
defer c.compressionPool.Put(comp)

c.compression.Reset(buf)
comp.Reset(buf)

if _, err := c.compression.Write(in); err != nil {
if _, err := comp.Write(in); err != nil {
return nil, err
}

if err := c.compression.Flush(); err != nil {
if err := comp.Close(); err != nil {
return nil, err
}

Expand Down
82 changes: 73 additions & 9 deletions exporter/awskinesisexporter/internal/compress/compresser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,107 @@
package compress_test

import (
"bytes"
"compress/flate"
"compress/gzip"
"compress/zlib"
"errors"
"fmt"
"io"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress"
)

func GzipDecompress(data []byte) ([]byte, error) {
buf := bytes.NewBuffer(data)

zr, err := gzip.NewReader(buf)
if err != nil {
return nil, err
}

out := bytes.Buffer{}
if _, err = io.CopyN(&out, zr, 1024); err != nil && !errors.Is(err, io.EOF) {
zr.Close()
return nil, err
}
zr.Close()
return out.Bytes(), nil
}

func NoopDecompress(data []byte) ([]byte, error) {
return data, nil
}

func ZlibDecompress(data []byte) ([]byte, error) {
buf := bytes.NewBuffer(data)

zr, err := zlib.NewReader(buf)
if err != nil {
return nil, err
}

out := bytes.Buffer{}
if _, err = io.CopyN(&out, zr, 1024); err != nil && !errors.Is(err, io.EOF) {
zr.Close()
return nil, err
}
zr.Close()
return out.Bytes(), nil
}

func FlateDecompress(data []byte) ([]byte, error) {
var err error
buf := bytes.NewBuffer(data)
zr := flate.NewReader(buf)
out := bytes.Buffer{}
if _, err = io.CopyN(&out, zr, 1024); err != nil && !errors.Is(err, io.EOF) {
zr.Close()
return nil, err
}
zr.Close()
return out.Bytes(), nil
}

func TestCompressorFormats(t *testing.T) {
t.Parallel()

testCases := []struct {
format string
format string
decompress func(data []byte) ([]byte, error)
}{
{format: "none"},
{format: "noop"},
{format: "gzip"},
{format: "zlib"},
{format: "flate"},
{format: "none", decompress: NoopDecompress},
{format: "noop", decompress: NoopDecompress},
{format: "gzip", decompress: GzipDecompress},
{format: "zlib", decompress: ZlibDecompress},
{format: "flate", decompress: FlateDecompress},
}

const data = "You know nothing Jon Snow"

for _, tc := range testCases {
t.Run(fmt.Sprintf("format_%s", tc.format), func(t *testing.T) {
c, err := compress.NewCompressor(tc.format)
logger := zaptest.NewLogger(t)
c, err := compress.NewCompressor(tc.format, logger)
require.NoError(t, err, "Must have a valid compression format")
require.NotNil(t, c, "Must have a valid compressor")

out, err := c.Do([]byte(data))
assert.NoError(t, err, "Must not error when processing data")
assert.NotNil(t, out, "Must have a valid record")
outDecompress, err := tc.decompress(out)
assert.NoError(t, err, "Decompression must have no errors")
assert.Equal(t, []byte(data), outDecompress, "Data input should be the same after compression and decompression")
})
}
_, err := compress.NewCompressor("invalid-format")
_, err := compress.NewCompressor("invalid-format", zaptest.NewLogger(t))
assert.Error(t, err, "Must error when an invalid compression format is given")
}

Expand Down Expand Up @@ -82,7 +146,7 @@ func benchmarkCompressor(b *testing.B, format string, length int) {
source := rand.NewSource(time.Now().UnixMilli())
genRand := rand.New(source)

compressor, err := compress.NewCompressor(format)
compressor, err := compress.NewCompressor(format, zaptest.NewLogger(b))
require.NoError(b, err, "Must not error when given a valid format")
require.NotNil(b, compressor, "Must have a valid compressor")

Expand Down
13 changes: 10 additions & 3 deletions exporter/awskinesisexporter/internal/compress/noop_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@

package compress // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress"

import "io"
import (
"io"
"sync"
)

type noop struct {
data io.Writer
}

func NewNoopCompressor() Compressor {
return &compressor{
compression: &noop{},
compressionPool: sync.Pool{
New: func() any {
return &noop{}
},
},
}
}

Expand All @@ -23,6 +30,6 @@ func (n noop) Write(p []byte) (int, error) {
return n.data.Write(p)
}

func (n noop) Flush() error {
func (n noop) Close() error {
return nil
}

0 comments on commit e9acd9e

Please sign in to comment.