Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Kafka zstd compression #7123

Closed
KarstenSchnitter opened this issue Mar 6, 2020 · 10 comments · Fixed by #8435
Closed

Support Kafka zstd compression #7123

KarstenSchnitter opened this issue Mar 6, 2020 · 10 comments · Fixed by #8435
Assignees

Comments

@KarstenSchnitter
Copy link
Contributor

Description

The Kafka plugins do not support zstd compression. This is due to the used sarama version. It can be fixed by an upgrade of the dependency to versions >1.26.0. Current version is 1.26.1

Proposal:

Upgrade Shopify/sarama to 1.26.1

Current behavior:

telegraf cannot consume or produce Kafka topics with zstd compression

Desired behavior:

telegraf can consume or produce Kafka topics with zstd compression

@sjwang90
Copy link
Contributor

sjwang90 commented Nov 2, 2020

@KarstenSchnitter We've upgraded to sarama 1.27.1 in #8289. If you're interested in submitting a PR for Kafka zstd compression that would be great. Otherwise we'll add this to our backlog.

@KarstenSchnitter
Copy link
Contributor Author

This is great news. I have not found the time to try that out yet. Shopify/sarama v1.27.1 should support zstd compression out of the box.

@clever-trevor
Copy link

I tested this with 1.17.2 agent today, ZSTD compression set in the output pointing to Kafka running v2.5.0 but the agent would not start and shows
E! [telegraf] Error running agent: could not initialize output kafka: kafka: invalid configuration (zstd compression requires Version >= V2_1_0_0)

Kafka is definitely above this version so not sure where Telegraf plugin is getting the version from.

@KarstenSchnitter
Copy link
Contributor Author

This error is generated by sarama when the Kafka consumer version is below 2.1.0. I think it is configured here to be 0.10.2.0:

config.Version = sarama.V0_10_2_0
The documentation says it should be configurable by property version, but I cannot find code, that would honour any configured value. If this was the case using a version above 2.1.0 should work.

@KarstenSchnitter
Copy link
Contributor Author

I revisited the configuration of the sarama Kafka client. It parses the version plugin property correctly and hands it over to sarama. This property tells sarama which Kafka version to use for the client. Using zstd requires at least version 2.1.0. @schmorgs have you tried changing this property in your plugin configuration?

[[inputs.kafka_consumer]]
  brokers = ["localhost:9092"]
  topics = ["telegraf"]
  version = "2.1.0"
  # further config

@clever-trevor
Copy link

I revisited the configuration of the sarama Kafka client. It parses the version plugin property correctly and hands it over to sarama. This property tells sarama which Kafka version to use for the client. Using zstd requires at least version 2.1.0. @schmorgs have you tried changing this property in your plugin configuration?

[[inputs.kafka_consumer]]
  brokers = ["localhost:9092"]
  topics = ["telegraf"]
  version = "2.1.0"
  # further config

Thanks, I assumed the Sarama code would detect the version and so didn't spot this param.

I've tested and it works fine, thanks very much for the help

@hackery
Copy link
Contributor

hackery commented Apr 14, 2021

Is there any expectation of changing the default from 0.10.2.0 or do we just have to add this override everywhere?
(worth noting perhaps that Sarama officially supports only "Kafka 2.6 through 2.7, although older releases are still likely to work")

I was bitten by this today, enabling zstd compression in a metrics producer, and then being unable to consume the metrics in Telegraf; worse, updating the version config above triggered a bug in Sarama (#1831 - this seems to only occur when it's trying to process a large backlog) where its memory usage balloons out of control, causing cyclic Telegraf crashes (every 10-15s!).

We've reverted the change but I now have a topic full of un-consumable compressed messages and a big gap in metrics visualisations.

@KarstenSchnitter
Copy link
Contributor Author

0.10.2.0 is the Sarama default. It is hard to decide on any other default, since the Kafka version in use is unknown.

I have experienced similar problems with Sarama consuming zstd topics outside of Telegraf. Maybe you could try to play around with the max_message_length parameter which is set to 0 (unlimited) by default. I am not sure it will help though. Maybe you could also try to throttle the topic in the Kafka configuration, so that Sarama is not able to consume too much data.

@hackery
Copy link
Contributor

hackery commented Apr 15, 2021

Kafka has (ok ... purports to have?) excellent bidirectional client/broker version compatibility negotiation. If Telegraf selected a later version in the Sarama config, would Sarama gracefully fall back to earlier API versions if needed, or does this setting force it to use the API version given?

Setting max_message_length is unlikely to help here; our messages are mostly short, it's the large total (count * size) of messages in the backlog that causes the issue. Kafka broker configuration provides no options that can throttle a consumer, as far as I know. What might be useful is a way in the Telegraf plugin to set arbitrary consumer properties.

@hackery
Copy link
Contributor

hackery commented Mar 4, 2022

According to @dnwe in IBM/sarama#1831 the memory inflation issue was addressed in Sarama 1.28.0 - first version of Telegraf with this fix appears to be 1.20.3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants