Skip to content

Commit

Permalink
Add support for gzip compression to amqp plugins (influxdata#5830)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and idohalevi committed Sep 23, 2020
1 parent d95f2ce commit 24c6930
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 12 deletions.
122 changes: 122 additions & 0 deletions internal/content_coding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package internal

import (
"bytes"
"compress/gzip"
"errors"
"io"
)

// NewContentEncoder returns a ContentEncoder for the encoding type.
func NewContentEncoder(encoding string) (ContentEncoder, error) {
switch encoding {
case "gzip":
return NewGzipEncoder()

case "identity", "":
return NewIdentityEncoder(), nil
default:
return nil, errors.New("invalid value for content_encoding")
}
}

// NewContentDecoder returns a ContentDecoder for the encoding type.
func NewContentDecoder(encoding string) (ContentDecoder, error) {
switch encoding {
case "gzip":
return NewGzipDecoder()
case "identity", "":
return NewIdentityDecoder(), nil
default:
return nil, errors.New("invalid value for content_encoding")
}
}

// ContentEncoder applies a wrapper encoding to byte buffers.
type ContentEncoder interface {
Encode([]byte) ([]byte, error)
}

// GzipEncoder compresses the buffer using gzip at the default level.
type GzipEncoder struct {
writer *gzip.Writer
buf *bytes.Buffer
}

func NewGzipEncoder() (*GzipEncoder, error) {
var buf bytes.Buffer
return &GzipEncoder{
writer: gzip.NewWriter(&buf),
buf: &buf,
}, nil
}

func (e *GzipEncoder) Encode(data []byte) ([]byte, error) {
e.buf.Reset()
e.writer.Reset(e.buf)

_, err := e.writer.Write(data)
if err != nil {
return nil, err
}
err = e.writer.Close()
if err != nil {
return nil, err
}
return e.buf.Bytes(), nil
}

// IdentityEncoder is a null encoder that applies no transformation.
type IdentityEncoder struct{}

func NewIdentityEncoder() *IdentityEncoder {
return &IdentityEncoder{}
}

func (*IdentityEncoder) Encode(data []byte) ([]byte, error) {
return data, nil
}

// ContentDecoder removes a wrapper encoding from byte buffers.
type ContentDecoder interface {
Decode([]byte) ([]byte, error)
}

// GzipDecoder decompresses buffers with gzip compression.
type GzipDecoder struct {
reader *gzip.Reader
buf *bytes.Buffer
}

func NewGzipDecoder() (*GzipDecoder, error) {
return &GzipDecoder{
reader: new(gzip.Reader),
buf: new(bytes.Buffer),
}, nil
}

func (d *GzipDecoder) Decode(data []byte) ([]byte, error) {
d.reader.Reset(bytes.NewBuffer(data))
d.buf.Reset()

_, err := d.buf.ReadFrom(d.reader)
if err != nil && err != io.EOF {
return nil, err
}
err = d.reader.Close()
if err != nil {
return nil, err
}
return d.buf.Bytes(), nil
}

// IdentityDecoder is a null decoder that returns the input.
type IdentityDecoder struct{}

func NewIdentityDecoder() *IdentityDecoder {
return &IdentityDecoder{}
}

func (*IdentityDecoder) Decode(data []byte) ([]byte, error) {
return data, nil
}
58 changes: 58 additions & 0 deletions internal/content_coding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package internal

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGzipEncodeDecode(t *testing.T) {
enc, err := NewGzipEncoder()
require.NoError(t, err)
dec, err := NewGzipDecoder()
require.NoError(t, err)

payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)

actual, err := dec.Decode(payload)
require.NoError(t, err)

require.Equal(t, "howdy", string(actual))
}

func TestGzipReuse(t *testing.T) {
enc, err := NewGzipEncoder()
require.NoError(t, err)
dec, err := NewGzipDecoder()
require.NoError(t, err)

payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)

actual, err := dec.Decode(payload)
require.NoError(t, err)

require.Equal(t, "howdy", string(actual))

payload, err = enc.Encode([]byte("doody"))
require.NoError(t, err)

actual, err = dec.Decode(payload)
require.NoError(t, err)

require.Equal(t, "doody", string(actual))
}

func TestIdentityEncodeDecode(t *testing.T) {
enc := NewIdentityEncoder()
dec := NewIdentityDecoder()

payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)

actual, err := dec.Decode(payload)
require.NoError(t, err)

require.Equal(t, "howdy", string(actual))
}
4 changes: 4 additions & 0 deletions plugins/inputs/amqp_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ The following defaults are known to work with RabbitMQ:
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
# content_encoding = "identity"

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down
35 changes: 29 additions & 6 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
Expand Down Expand Up @@ -52,12 +53,15 @@ type AMQPConsumer struct {
AuthMethod string
tls.ClientConfig

ContentEncoding string `toml:"content_encoding"`

deliveries map[telegraf.TrackingID]amqp.Delivery

parser parsers.Parser
conn *amqp.Connection
wg *sync.WaitGroup
cancel context.CancelFunc
parser parsers.Parser
conn *amqp.Connection
wg *sync.WaitGroup
cancel context.CancelFunc
decoder internal.ContentDecoder
}

type externalAuth struct{}
Expand Down Expand Up @@ -147,6 +151,10 @@ func (a *AMQPConsumer) SampleConfig() string {
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
# content_encoding = "identity"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down Expand Up @@ -201,6 +209,11 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
return err
}

a.decoder, err = internal.NewContentDecoder(a.ContentEncoding)
if err != nil {
return err
}

msgs, err := a.connect(amqpConf)
if err != nil {
return err
Expand Down Expand Up @@ -428,8 +441,7 @@ func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, a
}

func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delivery) error {
metrics, err := a.parser.Parse(d.Body)
if err != nil {
onError := func() {
// Discard the message from the queue; will never be able to process
// this message.
rejErr := d.Ack(false)
Expand All @@ -438,6 +450,17 @@ func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delive
d.DeliveryTag, rejErr)
a.conn.Close()
}
}

body, err := a.decoder.Decode(d.Body)
if err != nil {
onError()
return err
}

metrics, err := a.parser.Parse(body)
if err != nil {
onError()
return err
}

Expand Down
8 changes: 8 additions & 0 deletions plugins/outputs/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ For an introduction to AMQP see:
## Recommended to set to true.
# use_batch_format = false

## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
##
## Please note that when use_batch_format = false each amqp message contains only
## a single metric, it is recommended to use compression with batch format
## for best results.
# content_encoding = "identity"

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down
25 changes: 23 additions & 2 deletions plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ type AMQP struct {
Headers map[string]string `toml:"headers"`
Timeout internal.Duration `toml:"timeout"`
UseBatchFormat bool `toml:"use_batch_format"`
ContentEncoding string `toml:"content_encoding"`
tls.ClientConfig

serializer serializers.Serializer
connect func(*ClientConfig) (Client, error)
client Client
config *ClientConfig
sentMessages int
encoder internal.ContentEncoder
}

type Client interface {
Expand Down Expand Up @@ -149,6 +151,14 @@ var sampleConfig = `
## Recommended to set to true.
# use_batch_format = false
## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
##
## Please note that when use_batch_format = false each amqp message contains only
## a single metric, it is recommended to use compression with batch format
## for best results.
# content_encoding = "identity"
## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down Expand Up @@ -177,11 +187,16 @@ func (q *AMQP) Connect() error {
q.config = config
}

client, err := q.connect(q.config)
var err error
q.encoder, err = internal.NewContentEncoder(q.ContentEncoding)
if err != nil {
return err
}

q.client, err = q.connect(q.config)
if err != nil {
return err
}
q.client = client

return nil
}
Expand Down Expand Up @@ -227,6 +242,11 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
return err
}

body, err = q.encoder.Encode(body)
if err != nil {
return err
}

err = q.publish(key, body)
if err != nil {
// If this is the first attempt to publish and the connection is
Expand Down Expand Up @@ -298,6 +318,7 @@ func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
exchange: q.Exchange,
exchangeType: q.ExchangeType,
exchangePassive: q.ExchangePassive,
encoding: q.ContentEncoding,
timeout: q.Timeout.Duration,
}

Expand Down
10 changes: 6 additions & 4 deletions plugins/outputs/amqp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type ClientConfig struct {
exchangePassive bool
exchangeDurable bool
exchangeArguments amqp.Table
encoding string
headers amqp.Table
deliveryMode uint8
tlsConfig *tls.Config
Expand Down Expand Up @@ -114,10 +115,11 @@ func (c *client) Publish(key string, body []byte) error {
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: c.config.headers,
ContentType: "text/plain",
Body: body,
DeliveryMode: c.config.deliveryMode,
Headers: c.config.headers,
ContentType: "text/plain",
ContentEncoding: c.config.encoding,
Body: body,
DeliveryMode: c.config.deliveryMode,
})
}

Expand Down

0 comments on commit 24c6930

Please sign in to comment.