From 434fa8db7ef3b483676a89511f72ce8c2de3d388 Mon Sep 17 00:00:00 2001 From: Mauro Murari Date: Mon, 13 Aug 2018 20:40:18 -0300 Subject: [PATCH] Add message 'max_bytes' configuration (#4537) --- plugins/outputs/kafka/kafka.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index edb4aff75abf9..8f9ae12e713e2 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -37,6 +37,8 @@ type ( RequiredAcks int // MaxRetry Tag MaxRetry int + // Max Message Bytes + MaxMessageBytes int Version string `toml:"version"` @@ -140,6 +142,9 @@ var sampleConfig = ` ## until the next flush. # max_retry = 3 + ## Max message bytes, should be lower than server message.max.bytes config + # MaxMessageBytes = 0 + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" @@ -219,6 +224,10 @@ func (k *Kafka) Connect() error { config.Producer.Retry.Max = k.MaxRetry config.Producer.Return.Successes = true + if k.MaxMessageBytes > 0 { + config.Producer.MaxMessageBytes = k.MaxMessageBytes + } + // Legacy support ssl config if k.Certificate != "" { k.TLSCert = k.Certificate