Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka output plugin should support batching at the producer level #4161

Closed
JimHagan opened this issue May 17, 2018 · 8 comments · Fixed by #4491
Closed

Kafka output plugin should support batching at the producer level #4161

JimHagan opened this issue May 17, 2018 · 8 comments · Fixed by #4491
Assignees
Labels
area/kafka feature request Requests for new plugin and for new features to existing plugins performance problems with decreased performance or enhancements that improve performance
Milestone

Comments

@JimHagan
Copy link
Contributor

JimHagan commented May 17, 2018

Support Batching at the Producer Level

This will make it possible to scale to higher degree, by making fewer calls and may allow our compression of output objects to be more efficient.

Proposal:

Currently the Write method uses an individual call to sendMessage per metric. This could be refactored to allow for a batch paramter passed to write (and injected down from the Kafka output plugin). We could use sendMessages instead.

Suggestion...

func (k *Kafka, batch) Write(metrics []telegraf.Metric) error {
    if len(metrics) == 0 {
        return nil
    }

   counter:=0
   m_list:= []

    for _, metric := range metrics {
        buf, err := k.serializer.Serialize(metric)
        if err != nil {
            return err
        }

       counter += 1
       m_list.append(m)

        topicName := k.GetTopicName(metric)

        m := &sarama.ProducerMessage{
            Topic: topicName,
            Value: sarama.ByteEncoder(buf),
        }
        if h, ok := metric.Tags()[k.RoutingTag]; ok {
            m.Key = sarama.StringEncoder(h)
        }

        if counter%batch == 0 {
        _, _, err = k.producer.SendMessages(m_list)
         counter = 0;
         m_list = []
       }

        if err != nil {
            return fmt.Errorf("FAILED to send kafka message: %s\n", err)
        }
    }

    if counter > 0  {
        _, _, err = k.producer.SendMessages(m_list)
    }


    return nil
}

Use case: [Why is this important (helps with prioritizing requests)]

We have a very large multi-data center Kafka pipeline and we need batching to lower the number of calls as well as to make Kafka compression possibly more efficient.

@danielnelson danielnelson added performance problems with decreased performance or enhancements that improve performance area/kafka feature request Requests for new plugin and for new features to existing plugins labels May 17, 2018
@russorat
Copy link
Contributor

see also #2825

@JimHagan
Copy link
Contributor Author

@russorat Is there code for this in 1.8 that we could give a look at? I was just talking with @edbernier

@danielnelson
Copy link
Contributor

@JimHagan I have two potential patches that I could use your help performance testing, if you are up for it.

I have them in separate branches: kafka-perf-send-messages and kafka-perf-batch.

The send-messages branch works by simply using the SendMessages method on sarama.SyncProducer, this sends all messages out in a Write batch and waits for them all to complete, instead of the previous method of send/recv, send/recv, etc.

The other method combines SendMessages with message batching, this changes the output format to contain multiple metrics per message. It is a bit more complicated though and in simple testing it doesn't seem to show any performance improvements for me. I would be interested in what performance differences can be found in a more substantial test environment though.

There is a potential issue you may run into when using the master branch where Telegraf could deadlock shutting down, I will of course take care of this before the next release.

@JimHagan
Copy link
Contributor Author

I will get back to you @danielnelson I believe we want the second and the reason is we are using Kafka mirror maker to move large amounts of metrics to and from data centers and that should provide optimizations for what we are doing. I want to check in with one of our Kafka experts.

The only concern about the batching option is it looks like it attempts to send the whole buffer's worht of data as a batch. Am I reading that correctly? The idea I had was that we could constrain the number of messages that would be bundled per batch. I can certain test the version as is for now but we could run into the kafka max message size on our infrastructure (as it's configured).

@JimHagan
Copy link
Contributor Author

@danielnelson and @edbernier After consulting with Tom, he indicated that the first option (kafka-perf-send-messages) with SendMessages may make sense IF we rely on the Kafka producer to handle batching. Tom thinks that Kafka producers should know how to take a bunch of messages and do it's own batching provided configuration is exposed.

@danielnelson
Copy link
Contributor

The producer should be able to send the data efficiently using SendMessages, in my testing it is faster than sending with batching but I don't have a high traffic test environment. You can control the size of the batches using the metric_batch_size agent option, although be aware that this value is shared by all outputs.

I think we shouldn't merge the batching support pending benchmarks showing that it is needed, if you are available to help check this I could provide a development build.

@JimHagan
Copy link
Contributor Author

JimHagan commented Aug 3, 2018

@danielnelson Should I be getting an RC to test this with?

@danielnelson
Copy link
Contributor

I added links on #4517, let me know if you need a different package.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka feature request Requests for new plugin and for new features to existing plugins performance problems with decreased performance or enhancements that improve performance
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants