Skip to content

Commit

Permalink
Drop message batches in kafka output if too large (influxdata#4565)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and Jean-Louis Dupond committed Apr 22, 2019
1 parent e276d05 commit 7bfea37
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 3 deletions.
1 change: 1 addition & 0 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
err := ro.write(batch)
if err != nil {
ro.failMetrics.Add(batch...)
log.Printf("E! Error writing to output [%s]: %v", ro.Name, err)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ and use the old zookeeper connection method.

## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 65536
max_message_len = 1000000
```

## Testing
Expand Down
18 changes: 17 additions & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Kafka struct {
Topics []string
Brokers []string
MaxMessageLen int
Version string `toml:"version"`

Cluster *cluster.Consumer

Expand Down Expand Up @@ -64,6 +65,12 @@ var sampleConfig = `
## Optional Client id
# client_id = "Telegraf"
## Set the minimal supported Kafka version. Setting this enables the use of new
## Kafka features and APIs. Of particular interest, lz4 compression
## requires at least version 0.10.0.0.
## ex: version = "1.1.0"
# version = ""
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
Expand All @@ -88,7 +95,7 @@ var sampleConfig = `
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 65536
max_message_len = 1000000
`

func (k *Kafka) SampleConfig() string {
Expand All @@ -111,6 +118,15 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.acc = acc

config := cluster.NewConfig()

if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return err
}
config.Version = version
}

config.Consumer.Return.Errors = true

tlsConfig, err := k.ClientConfig.TLSConfig()
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## 0 : No compression
## 1 : Gzip compression
## 2 : Snappy compression
## 3 : LZ4 compression
# compression_codec = 0

## RequiredAcks is used in Produce Requests to tell the broker how many
Expand Down
8 changes: 7 additions & 1 deletion plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"crypto/tls"
"fmt"
"log"
"strings"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -79,7 +80,7 @@ var sampleConfig = `
# client_id = "Telegraf"
## Set the minimal supported Kafka version. Setting this enables the use of new
## Kafka features and APIs. Of particular interested, lz4 compression
## Kafka features and APIs. Of particular interest, lz4 compression
## requires at least version 0.10.0.0.
## ex: version = "1.1.0"
# version = ""
Expand Down Expand Up @@ -120,6 +121,7 @@ var sampleConfig = `
## 0 : No compression
## 1 : Gzip compression
## 2 : Snappy compression
## 3 : LZ4 compression
# compression_codec = 0
## RequiredAcks is used in Produce Requests to tell the broker how many
Expand Down Expand Up @@ -294,6 +296,10 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
// We could have many errors, return only the first encountered.
if errs, ok := err.(sarama.ProducerErrors); ok {
for _, prodErr := range errs {
if prodErr.Err == sarama.ErrMessageSizeTooLarge {
log.Printf("E! Error writing to output [kafka]: Message too large, consider increasing `max_message_bytes`; dropping batch")
return nil
}
return prodErr
}
}
Expand Down

0 comments on commit 7bfea37

Please sign in to comment.