Skip to content

Commit

Permalink
Add support for lz4 compression to kafka output (influxdata#4492)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and rgitzel committed Oct 17, 2018
1 parent 10cdb28 commit b4e6643
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
10 changes: 8 additions & 2 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,23 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## Kafka topic for producer messages
topic = "telegraf"

## Optional client id
## Optional Client id
# client_id = "Telegraf"

## Set the minimal supported Kafka version. Setting this enables the use of new
## Kafka features and APIs. Of particular interested, lz4 compression
## requires at least version 0.10.0.0.
## ex: version = "1.1.0"
# version = ""

## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
## Following topic suffix methods are supported:
## measurement - suffix equals to separator + measurement's name
## tags - suffix equals to separator + specified tags' values
## interleaved with separator

## Suffix equals to "_" + measurement's name
## Suffix equals to "_" + measurement name
# [outputs.kafka.topic_suffix]
# method = "measurement"
# separator = "_"
Expand Down
16 changes: 16 additions & 0 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type (
// MaxRetry Tag
MaxRetry int

Version string `toml:"version"`

// Legacy TLS config options
// TLS client certificate
Certificate string
Expand Down Expand Up @@ -74,6 +76,12 @@ var sampleConfig = `
## Optional Client id
# client_id = "Telegraf"
## Set the minimal supported Kafka version. Setting this enables the use of new
## Kafka features and APIs. Of particular interested, lz4 compression
## requires at least version 0.10.0.0.
## ex: version = "1.1.0"
# version = ""
## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
## Following topic suffix methods are supported:
Expand Down Expand Up @@ -191,6 +199,14 @@ func (k *Kafka) Connect() error {
}
config := sarama.NewConfig()

if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return err
}
config.Version = version
}

if k.ClientID != "" {
config.ClientID = k.ClientID
} else {
Expand Down

0 comments on commit b4e6643

Please sign in to comment.