Skip to content

Commit

Permalink
Send messages in a batch in Kafka output
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson committed Aug 15, 2018
1 parent 3461458 commit 0446598
Showing 1 changed file with 27 additions and 11 deletions.
38 changes: 27 additions & 11 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
default:
topicName = k.Topic
}

return topicName
}

Expand Down Expand Up @@ -272,21 +273,37 @@ func (k *Kafka) Description() string {
}

func (k *Kafka) Write(metrics []telegraf.Metric) error {
msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
batches := make(map[string]map[string][]telegraf.Metric)
for _, metric := range metrics {
buf, err := k.serializer.Serialize(metric)
if err != nil {
return err
topicName := k.GetTopicName(metric)
if _, ok := batches[topicName]; !ok {
batches[topicName] = make(map[string][]telegraf.Metric)
}

m := &sarama.ProducerMessage{
Topic: k.GetTopicName(metric),
Value: sarama.ByteEncoder(buf),
key, _ := metric.GetTag(k.RoutingTag)
if _, ok := batches[topicName][key]; !ok {
batches[topicName][key] = make([]telegraf.Metric, 0)
}
if h, ok := metric.GetTag(k.RoutingTag); ok {
m.Key = sarama.StringEncoder(h)
batches[topicName][key] = append(batches[topicName][key], metric)
}

msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
for topicName, v := range batches {
for routingKey, metrics := range v {

buf, err := k.serializer.SerializeBatch(metrics)
if err != nil {
return err
}

m := &sarama.ProducerMessage{
Topic: topicName,
Value: sarama.ByteEncoder(buf),
Key: sarama.StringEncoder(routingKey),
}

msgs = append(msgs, m)
}
msgs = append(msgs, m)
}

err := k.producer.SendMessages(msgs)
Expand All @@ -299,7 +316,6 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
}
return err
}

return nil
}

Expand Down

0 comments on commit 0446598

Please sign in to comment.