Skip to content

Commit

Permalink
Send messages in a batch in Kafka output
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson committed Aug 21, 2018
1 parent 430d710 commit 072370c
Showing 1 changed file with 27 additions and 12 deletions.
39 changes: 27 additions & 12 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
default:
topicName = k.Topic
}

return topicName
}

Expand Down Expand Up @@ -290,22 +291,37 @@ func (k *Kafka) routingKey(metric telegraf.Metric) string {
}

func (k *Kafka) Write(metrics []telegraf.Metric) error {
msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
batches := make(map[string]map[string][]telegraf.Metric)
for _, metric := range metrics {
buf, err := k.serializer.Serialize(metric)
if err != nil {
return err
topicName := k.GetTopicName(metric)
if _, ok := batches[topicName]; !ok {
batches[topicName] = make(map[string][]telegraf.Metric)
}

m := &sarama.ProducerMessage{
Topic: k.GetTopicName(metric),
Value: sarama.ByteEncoder(buf),
}
key := k.routingKey(metric)
if key != "" {
m.Key = sarama.StringEncoder(key)
if _, ok := batches[topicName][key]; !ok {
batches[topicName][key] = make([]telegraf.Metric, 0)
}
batches[topicName][key] = append(batches[topicName][key], metric)
}

msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
for topicName, v := range batches {
for routingKey, metrics := range v {

buf, err := k.serializer.SerializeBatch(metrics)
if err != nil {
return err
}

m := &sarama.ProducerMessage{
Topic: topicName,
Value: sarama.ByteEncoder(buf),
Key: sarama.StringEncoder(routingKey),
}

msgs = append(msgs, m)
}
msgs = append(msgs, m)
}

err := k.producer.SendMessages(msgs)
Expand All @@ -322,7 +338,6 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
}
return err
}

return nil
}

Expand Down

0 comments on commit 072370c

Please sign in to comment.