Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add max_processing_time config to Kafka Consumer input #9988

Merged
merged 6 commits into from
Oct 28, 2021
Merged

feat: add max_processing_time config to Kafka Consumer input #9988

merged 6 commits into from
Oct 28, 2021

Conversation

cruscio
Copy link
Contributor

@cruscio cruscio commented Oct 22, 2021

Required for all PRs:

resolves #7622 where the kafka_consumer input plugin continually abandons and is resubscribed to a topic partition when Telegraf output takes longer than the 100ms sarama default.

Added the ability to set sarama Consumer.MaxProcessingTime in the Kafka Consumer input plugin configuration, by defining a new max_processing_time configuraiton parameter. Documentation is based on the sarama source.

Created tests for default and custom configuration values. Did not create a test for an invalid configuration because any valid Duration (already has test coverage) should be a valid configuration.

This change locally hard codes the sarama default of 100ms. The alternative, not setting a default here and falling back on the sarama default (the existing behavior), could result in the local documentation being incorrect if the sarama default is changed in the future.

I see the issue from #7622 on my system, verified it can be reproduced with this build at the default max_processing_time of 100ms, and verified it was resolved with a larger max_processing_time of 500ms.

Full disclosure: First-time contributor & golang beginner. I tried to emulate existing code patterns. If I did anything wrong here, please let me know.

@telegraf-tiger telegraf-tiger bot added the fix pr to fix corresponding bug label Oct 22, 2021
@cruscio
Copy link
Contributor Author

cruscio commented Oct 22, 2021

Not sure what I'm doing wrong.

  • ci/circleci: deps test failed on initial pull request.
  • Made changes. make fmt outputs nothing for me locally now.
  • pushed changes
  • ci/circleci: deps test still fails.

@powersj
Copy link
Contributor

powersj commented Oct 22, 2021

❯ git clone https://github.com/cruscio/telegraf -b kafka_max_processing_time
Cloning into 'telegraf'...
remote: Enumerating objects: 56943, done.
remote: Counting objects: 100% (214/214), done.
remote: Compressing objects: 100% (163/163), done.
remote: Total 56943 (delta 95), reused 89 (delta 45), pack-reused 56729
Receiving objects: 100% (56943/56943), 37.06 MiB | 13.78 MiB/s, done.
Resolving deltas: 100% (33584/33584), done.
/tmp took 3s 
❯ cd telegraf/
telegraf on  kafka_max_processing_time via 🐹 v1.17.1 
❯ make fmt
telegraf on  kafka_max_processing_time [!] via 🐹 v1.17.1 
❯ git diff
diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go
index 5c4f19f9..94603d50 100644
--- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go
+++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go
@@ -169,8 +169,8 @@ func TestInit(t *testing.T) {
                {
                        name: "custom max_processing_time",
                        plugin: &KafkaConsumer{
-                               MaxProcessingTime: config.Duration(1000*time.Millisecond),
-                               Log:    testutil.Logger{},
+                               MaxProcessingTime: config.Duration(1000 * time.Millisecond),
+                               Log:               testutil.Logger{},
                        },
                        check: func(t *testing.T, plugin *KafkaConsumer) {
                                require.Equal(t, plugin.config.Consumer.MaxProcessingTime, time.Duration(1000*time.Millisecond))

@cruscio
Copy link
Contributor Author

cruscio commented Oct 22, 2021

Weird, I didn't get that output on a fresh clone mounted to a clean golang docker container instance. I assumed if it wanted to remove the spaces from time.Duration, I should remove them from config.Duration too, and it didn't complain. I'll add them back in for config.Duration. Thanks

@cruscio
Copy link
Contributor Author

cruscio commented Oct 22, 2021

What should be done with this linting issues

$ make lint-branch | grep kafka_consumer
WARN [runner] Can't process result by diff processor: can't prepare diff by revgrep: could not read git repo: error executing git diff "master" "": exit status 128
[...]
plugins/inputs/kafka_consumer/kafka_consumer_test.go:45:62                           revive         import-shadowing: The name 'config' shadows an import name
plugins/inputs/kafka_consumer/kafka_consumer.go:161:41                               revive         import-shadowing: The name 'config' shadows an import name
plugins/inputs/kafka_consumer/kafka_consumer.go:166:62                               revive         import-shadowing: The name 'config' shadows an import name
plugins/inputs/kafka_consumer/kafka_consumer.go:193:2                                revive         import-shadowing: The name 'config' shadows an import name
[...]
make: *** [Makefile:162: lint-branch] Error 1

I added imports of github.com/influxdata/telegraf/config to kafka_consumer.go and kafka_consumer_test.go.

In kafka_consumer_test.go that import conflicts with:

45: func (c *FakeCreator) Create(brokers []string, group string, config *sarama.Config) (ConsumerGroup, error) {

Is refactoring the tests, renaming that config variable, within an acceptable scope for this pull request? I'm happy to do it if that's the right path, I was just trying to keep my changes to a minimum. Is there a better alternative?

@powersj
Copy link
Contributor

powersj commented Oct 22, 2021

As we try to clean up lint issues with each PR, you should feel free to make changes to make things lint clean in your PR. I would add a comment to your initial PR message why you made those changes though.

@cruscio
Copy link
Contributor Author

cruscio commented Oct 22, 2021

After a closer look, I'm not knowledgeable enough to be comfortable fixing the import-shadowing: The name 'config' shadows an import name linting issues I've caused in kafka_consumer_test.go. Tests passed without error when I ran them though. Where does that leaves this pull request?

@powersj
Copy link
Contributor

powersj commented Oct 22, 2021

Hey,

Thanks for taking the time to put this together with tests and all.

I do not know kafka very well, but based on the bug reports it looks like the underlying issue is telegraf does not process events as fast as kafka wants and as a result closes the connection?

How would a user know to increase this setting and to what value?

@cruscio
Copy link
Contributor Author

cruscio commented Oct 25, 2021

If I understand correctly (and I very well may not), the consumer needs to acknowledge completion of processing a message, not just ack receiving it. By default, Sarama sets a default timeout of 100ms to complete processing (due to implementation, that can be as much as 200ms).

How would a user know to increase this setting

Either the Telegraf debug log or Kafka broker log showing frequent reconnects.

consumer/broker/# abandoned subscription to {{topic}}/# because consuming was taking too long
consumer/broker/# added subscription to {{topic}}/#

and to what value?

A safe general rule should be to set max_processing_time to the longest timeout across all output plugins (e.g. 5s for the influxdb_v2 output's default). You may also need to consider the additional timing of processor plugins; however, if the output timeout is well above the time actually taken to output, this should be inconsequential in most cases.

And I don't know how aggregator plugins would come into play here.

Oct 25 12:17:29 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:29Z D! [sarama] consumer/broker/2 added subscription to {{topic}}/3
Oct 25 12:17:29 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:29Z D! [sarama] consumer/broker/4 abandoned subscription to {{topic}}/5 because consuming was taking too long
Oct 25 12:17:29 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:29Z D! [sarama] consumer/broker/4 added subscription to {{topic}}/5
Oct 25 12:17:29 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:29Z D! [sarama] consumer/broker/3 abandoned subscription to {{topic}}/4 because consuming was taking too long
Oct 25 12:17:29 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:29Z D! [sarama] consumer/broker/3 added subscription to {{topic}}/4
Oct 25 12:17:31 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:31Z D! [outputs.influxdb_v2] Wrote batch of 4000 metrics in 376.862145ms
Oct 25 12:17:31 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:31Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 12000 metrics
Oct 25 12:17:33 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:33Z D! [outputs.influxdb_v2] Wrote batch of 4000 metrics in 359.410678ms
Oct 25 12:17:33 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:33Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 12000 metrics
Oct 25 12:17:39 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:39Z D! [outputs.influxdb_v2] Wrote batch of 4000 metrics in 348.190917ms
Oct 25 12:17:39 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:39Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 12000 metrics
Oct 25 12:17:49 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:49Z D! [outputs.influxdb_v2] Wrote batch of 3458 metrics in 206.371544ms
Oct 25 12:17:49 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:49Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 12000 metrics
Oct 25 12:17:59 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:59Z D! [outputs.influxdb_v2] Wrote batch of 2409 metrics in 75.890174ms
Oct 25 12:17:59 {{host}} ffa289f41cde[2040]: 2021-10-25T12:17:59Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 12000 metrics
Oct 25 12:18:06 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:06Z D! [outputs.influxdb_v2] Wrote batch of 4000 metrics in 237.626495ms
Oct 25 12:18:06 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:06Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 12000 metrics
Oct 25 12:18:10 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:10Z D! [outputs.influxdb_v2] Wrote batch of 4000 metrics in 689.144013ms
Oct 25 12:18:10 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:10Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 12000 metrics
Oct 25 12:18:12 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:12Z D! [outputs.influxdb_v2] Wrote batch of 4000 metrics in 596.268425ms
Oct 25 12:18:12 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:12Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 12000 metrics
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/2 abandoned subscription to {{topic}}/3 because consuming was taking too long
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/7 abandoned subscription to {{topic}}/1 because consuming was taking too long
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/3 abandoned subscription to {{topic}}/4 because consuming was taking too long
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/1 abandoned subscription to {{topic}}/2 because consuming was taking too long
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/5 abandoned subscription to {{topic}}/6 because consuming was taking too long
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/6 abandoned subscription to {{topic}}/0 because consuming was taking too long
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/4 abandoned subscription to {{topic}}/5 because consuming was taking too long
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [outputs.influxdb_v2] Wrote batch of 4000 metrics in 423.581525ms
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/7 added subscription to {{topic}}/1
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [outputs.influxdb_v2] Buffer fullness: 53 / 12000 metrics
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/2 added subscription to {{topic}}/3
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/3 added subscription to {{topic}}/4
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/6 added subscription to {{topic}}/0
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/5 added subscription to {{topic}}/6
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/4 added subscription to {{topic}}/5
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/1 added subscription to {{topic}}/2
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/2 abandoned subscription to {{topic}}/3 because consuming was taking too long
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/2 added subscription to {{topic}}/3
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/7 abandoned subscription to {{topic}}/1 because consuming was taking too long
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/7 added subscription to {{topic}}/1
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/4 abandoned subscription to {{topic}}/5 because consuming was taking too long
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/4 added subscription to {{topic}}/5
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/2 abandoned subscription to {{topic}}/3 because consuming was taking too long
Oct 25 12:18:14 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:14Z D! [sarama] consumer/broker/2 added subscription to {{topic}}/3
Oct 25 12:18:16 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:16Z D! [outputs.influxdb_v2] Wrote batch of 4000 metrics in 343.790041ms
Oct 25 12:18:16 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:16Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 12000 metrics
Oct 25 12:18:19 {{host}} ffa289f41cde[2040]: 2021-10-25T12:18:19Z D! [sarama] consumer/broker/5 abandoned subscription to {{topic}}/6 because consuming was taking too long

Copy link
Contributor

@powersj powersj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the details, that helps a lot. I think if we update the README to guide the user just a little bit more, I am +1 on this.

Thanks again!

plugins/inputs/kafka_consumer/README.md Outdated Show resolved Hide resolved
plugins/inputs/kafka_consumer/kafka_consumer.go Outdated Show resolved Hide resolved
@powersj powersj added the ready for final review This pull request has been reviewed and/or tested by multiple users and is ready for a final review. label Oct 25, 2021
@telegraf-tiger
Copy link
Contributor

🥳 This pull request decreases the Telegraf binary size by -0.01 % for linux amd64 (new size: 132.1 MB, nightly size 132.1 MB)

📦 Looks like new artifacts were built from this PR.

Expand this list to get them here! 🐯

Artifact URLs

DEB RPM TAR GZ ZIP
amd64.deb aarch64.rpm darwin_amd64.tar.gz windows_amd64.zip
arm64.deb armel.rpm freebsd_amd64.tar.gz windows_i386.zip
armel.deb armv6hl.rpm freebsd_armv7.tar.gz
armhf.deb i386.rpm freebsd_i386.tar.gz
i386.deb ppc64le.rpm linux_amd64.tar.gz
mips.deb s390x.rpm linux_arm64.tar.gz
mipsel.deb x86_64.rpm linux_armel.tar.gz
ppc64el.deb linux_armhf.tar.gz
s390x.deb linux_i386.tar.gz
linux_mips.tar.gz
linux_mipsel.tar.gz
linux_ppc64le.tar.gz
linux_s390x.tar.gz
static_linux_amd64.tar.gz

@powersj powersj merged commit 343e846 into influxdata:master Oct 28, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
fix pr to fix corresponding bug ready for final review This pull request has been reviewed and/or tested by multiple users and is ready for a final review.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Telegraf 1.13.4-1 sarama lib "because consuming was taking too long" messages
2 participants