Skip to content

Commit

Permalink
feat(inputs.kafka_consumer): Add resolve canonical bootstrap server o…
Browse files Browse the repository at this point in the history
…ption (#15368)
  • Loading branch information
powersj authored May 31, 2024
1 parent 31a1d34 commit 82902eb
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 16 deletions.
5 changes: 5 additions & 0 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ to use them.
## limit.
# metadata_retry_max_duration = 0

## When set to true, this turns each bootstrap broker address into a set of
## IPs, then does a reverse lookup on each one to get its canonical hostname.
## This list of hostnames then replaces the original address list.
## resolve_canonical_bootstrap_servers_only = false

## Strategy for making connection to kafka brokers. Valid options: "startup",
## "defer". If set to "defer" the plugin is allowed to start before making a
## connection. This is useful if the broker may be down when telegraf is
Expand Down
34 changes: 18 additions & 16 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,24 @@ type empty struct{}
type semaphore chan empty

type KafkaConsumer struct {
Brokers []string `toml:"brokers"`
Version string `toml:"kafka_version"`
ConsumerGroup string `toml:"consumer_group"`
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
MaxProcessingTime config.Duration `toml:"max_processing_time"`
Offset string `toml:"offset"`
BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"`
TopicRegexps []string `toml:"topic_regexps"`
TopicTag string `toml:"topic_tag"`
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`
Brokers []string `toml:"brokers"`
Version string `toml:"kafka_version"`
ConsumerGroup string `toml:"consumer_group"`
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
MaxProcessingTime config.Duration `toml:"max_processing_time"`
Offset string `toml:"offset"`
BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"`
TopicRegexps []string `toml:"topic_regexps"`
TopicTag string `toml:"topic_tag"`
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`
ResolveCanonicalBootstrapServersOnly bool `toml:"resolve_canonical_bootstrap_servers_only"`

kafka.ReadConfig

kafka.Logger

Log telegraf.Logger `toml:"-"`
Expand Down Expand Up @@ -150,6 +150,8 @@ func (k *KafkaConsumer) Init() error {
k.ConsumerCreator = &SaramaCreator{}
}

cfg.Net.ResolveCanonicalBootstrapServers = k.ResolveCanonicalBootstrapServersOnly

cfg.Consumer.MaxProcessingTime = time.Duration(k.MaxProcessingTime)

if k.ConsumerFetchDefault != 0 {
Expand Down
5 changes: 5 additions & 0 deletions plugins/inputs/kafka_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@
## limit.
# metadata_retry_max_duration = 0

## When set to true, this turns each bootstrap broker address into a set of
## IPs, then does a reverse lookup on each one to get its canonical hostname.
## This list of hostnames then replaces the original address list.
## resolve_canonical_bootstrap_servers_only = false

## Strategy for making connection to kafka brokers. Valid options: "startup",
## "defer". If set to "defer" the plugin is allowed to start before making a
## connection. This is useful if the broker may be down when telegraf is
Expand Down

0 comments on commit 82902eb

Please sign in to comment.