-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send messages containing metric batches in Kafka output #4517
Conversation
I did run some benchmark agains #4154 ResultsThis PR is faster than confluent 💙
ProblemsI'm getting error always when we have larger number of messages:
|
@otherpirate Thanks for the help testing, this error is somewhat anticipated as predicted by @JimHagan. As a temporary workaround you can control the maximum number of metrics per batch by setting the I have already merged a change into master that changes the plugin to send multiple messages per request (#4491). One thing I am not sure about is if combining these two methods will yield further improvements, in my testing it is actually worse to batch. What would be really helpful is if you could compare the performance of this PR against the code currently in master using one of the nightly builds. |
I did a PR (#4537) to this branch just adding MessageMaxBytes as a configuration. Currently I'm handling with 64mb messages without problems.
|
I did the benchmark against kafka-perf-batch and master, here are the results: kafka-perf-batch (8993c75)
master (2a4267e)
|
There is quite a bit of noise, and I see at least one flush where master does more metrics in less time, but overall it looks like the batching is helping beyond just multiple messages per request. |
@danielnelson We want to test this feature ASAP. I'm having a bit of confusion around what features are available in the links above. One core reason for the request we made originally was so that downstream consumers like Mirror Maker (for mirroring data between two different kafka clusters) could take advantage of fewer, more compressed messages. So it wasn't just to save time on the initial write to Kafka One thing that's unclear is when you use the SendMessages option Is any of the underlying Sarama library code doing some behind the scenes batching? @edbernier for visibility |
434fa8d
to
0446598
Compare
The master branch and the nightly builds contain the "SendMessages" fix, without batching. With SendMessages my understanding this that the sarama Go client is sending multiple messages per RecordBatch, instead of the previous behavior of having a single message per RecordBatch. I believe that this means that all messages in the RecordBatch are compressed as a single unit. I also added the We would like to test the nightly builds against these builds of this PR:
One potential issue you may run into when testing is that since each topic is batched together, the batch could contain up to the agent's Another change that will need to be made before we can merge this PR is to handle batches that are too large. Because we can't determine the size of a metric batch in bytes we will probably need to log and drop the metrics to avoid being unable to make progress sending metrics. |
@danielnelson Is there any possibility that we are leaving some points out of the batch that gets sent to kafka? I didn't see any errors when I ran the code in DEBUG mode. We have a very consistent end to end metrics test we run so we track exactly how many test metrics and know when it's not matching the expectation. I'm going to try a few tweaks like changing acks and codec. As well as the max message bytes. NOTE: My test metrics are not given a time stamp before I send to telegraf, so would the batching affect the way timestamps are assigned? In other words could it cause some de-duplication at some stage of the pipeline UPDATE: I drastically reduced my metrics_batch_size to 100 and I seem to be getting my data through now. What's strange is I wasn't getting Kafka errors previously, just getting loss of data. I can confirm that the bytes out is drastically reduced (most likely due to compression benefits..) Update 2: For the record we have two layers of telegraf one producer and one consumer. On the consumer side I may have had my max_message_len param set to low at 65k. I will e experimenting increasing this, and I may be able to bring my max batch size up again.
|
@JimHagan That's pretty strange, I'll try to replicate, what do you have |
I have discovered a few issues: This error message can be from either the producer (kafka output) or the broker despite the appearance that it is from the server:
Another issue I noticed is that we are not displaying a log message on every write error, only on the flush_interval write. When a batch fails for being too large, it will never be sendable, we will need to log and drop the full batch. As these issues are independent of batching I created a separate pull request to address them as well as a few other tweaks to the input and output: #4565. |
@JimHagan Now that you have it mostly working with the smaller batch size, can you compare how the performance is against master when using the same configuration. |
@danielnelson Out of curiosity how does the telegraf producer spread writes accross the kafka brokers? Is it using a round robin? It seems like the writes are all going into the first broker listed in my config. |
The sarama client sends all requests to the leader, similar to described in the producer documentation. We should check if the consumer side is consuming from all brokers though, that side should read from all brokers so long as the data is written to multiple partitions using the |
@danielnelson We have the routing_tag commented out. We assumed that there would be a random key generated if we omitted it. I don't necessarily want to add the host tag to all metrics coming through. |
I opened a new feature request for adding support for a random routing key (#4573). This should be simple to add, I'll try to work on it later today. There is a slight downside to this method as you will not maintain series ordering by timestamp. I'm not an expert on this aspect but my understanding is that it is slightly more expensive for InfluxDB to process points coming in out of order. |
Thanks @danielnelson I appreciate the idea of supporting a random routing key. I think it is critical in our use case. Do you still want me to test on master? Do I need to build it or is there an RPM? |
0446598
to
072370c
Compare
@JimHagan, I have added the the routing key change to master, you will want to enable it with: [[outputs.kafka]]
routing_key = "random" For testing you can use the nightly builds for non-batching, and for batching here are the latest builds of this branch |
@danielnelson I'm testing |
I'll make sure the batch parameter is added, but have you been able to measure the difference between this branch and master? |
Sorry @danielnelson I'm been testing this one (1.8.0~072370c9). Is there a build for master or would I need to build it? I have observed benefits in the message size due to better compression when batching. Overall I'm pleased. Are you referring to the nightly builds as the Here are some of the write times I'm seeing w/ the
|
Yes, the nightly builds are built from the master branch. |
For us there are two performance considerations, 1 is write time to producers, but also the compactness of the messages that are actually delivered. Since we mirror the Kafka data around, the compactness is a factor for our needs. I'm assuming that in the nightly build version Sarama will batch these and zip them as a batch? Is that your thought? It might be great to add the number of bytes written in addition to the number of metrics if it's not too expensive to calculate. Here are some write times with 1.8.x...
Here are some with nightly...
|
It would really help for us to have a consensus about what Sarama is doing in terms of batching/zipping on behind the So at the Kafka level it seems quite equivalent. The dips in the graphs represent restart events... Kafka topic inbound data rate Kafka topic outbound data rate |
It looks like the timings are quite similar between the two builds. The answer to the question about what SendMessages does under the covers is complicated. The messages are batched but they are not always placed into a single batch. Some producer hints are used to build one or more batches, it does this so that it can begin sending as soon as possible. We don't expose these options but they are the
All messages within a batch are compressed together. The exact wire format also depends on the version set, so I suggest setting the new We may be able to hook into the go-metrics that the sarama library uses to gain some additional insights, but if I'm reading the graphs correctly it looks like it is approximately the same with two builds? |
@danielnelson My interpretation is that the two builds seem to be doing something quite similar as well. Please explain the version parameter a bit more? Exposing This may be an unrelated thing, but we were trying to get a socketlistener working to read collecd format from GitHub (as a second socketlistener since we still wanted to read native line protocol on a different port). We started getting a lot of these errors... This was causing native line protocol input and the collectd input to fail. I rolled back to 1.5.2 (the version we were on) and it seems like the code is functional well. Is this related to batching refactor? These were the configs... ```[[inputs.socket_listener]] [[inputs.socket_listener]]
|
The I believe this error is unrelated to the kafka, can you open a new issue and include how you are configuring github to send collectd data? |
@danielnelson So what direction are you going to go for the 1.8 release? When can be expect that to go out? |
For 1.8 we will go with what is in master. Before the batching change is merged I would like to have more comprehensive performance testing, as it introduces more complexity for configuration and potential issues with batch size. Expect an 1.8.0-rc1 later this week and the final release ~2 weeks afterwards subject to testing. Still curious about that collectd "no serializable fields", is this something I can configure github to send to one of my test servers? |
RE: the Github thing one of our teams is setting up a github enterprise appliance and is somehow configuring it to send internal metrics over UDP as collectd. I don't know exactly how they rigged it up, but I'll check. |
This is a potential change we may want to make in the Kafka output to improve performance, made available for testing. If it were to be added we would need to provide controls to limit the maximum batch size.
Required for all PRs: