Skip to content

Commit

Permalink
Standardize variable name and comment for max_undelivered_messages
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson committed Nov 2, 2018
1 parent ef34637 commit b545796
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 131 deletions.
15 changes: 12 additions & 3 deletions plugins/inputs/amqp_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ For an introduction to AMQP see:
The following defaults are known to work with RabbitMQ:

```toml
# AMQP consumer plugin
[[inputs.amqp_consumer]]
## Broker to consume from.
## deprecated in 1.7; use the brokers option
Expand Down Expand Up @@ -46,16 +45,26 @@ The following defaults are known to work with RabbitMQ:

## AMQP queue name
queue = "telegraf"

## AMQP queue durability can be "transient" or "durable".
queue_durability = "durable"

## Binding Key
binding_key = "#"

## Maximum number of messages server should give to the worker.
# prefetch_count = 50

## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000

## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
Expand Down
50 changes: 33 additions & 17 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@ import (
"github.com/streadway/amqp"
)

const (
defaultMaxUndeliveredMessages = 1000
)

type empty struct{}
type semaphore chan empty

// AMQPConsumer is the top level struct for this plugin
type AMQPConsumer struct {
URL string `toml:"url"` // deprecated in 1.7; use brokers
Brokers []string `toml:"brokers"`
Username string `toml:"username"`
Password string `toml:"password"`
Exchange string `toml:"exchange"`
ExchangeType string `toml:"exchange_type"`
ExchangeDurability string `toml:"exchange_durability"`
ExchangePassive bool `toml:"exchange_passive"`
ExchangeArguments map[string]string `toml:"exchange_arguments"`
URL string `toml:"url"` // deprecated in 1.7; use brokers
Brokers []string `toml:"brokers"`
Username string `toml:"username"`
Password string `toml:"password"`
Exchange string `toml:"exchange"`
ExchangeType string `toml:"exchange_type"`
ExchangeDurability string `toml:"exchange_durability"`
ExchangePassive bool `toml:"exchange_passive"`
ExchangeArguments map[string]string `toml:"exchange_arguments"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`

// Queue Name
Queue string `toml:"queue"`
Expand Down Expand Up @@ -120,6 +125,16 @@ func (a *AMQPConsumer) SampleConfig() string {
## Maximum number of messages server should give to the worker.
# prefetch_count = 50
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
Expand Down Expand Up @@ -378,8 +393,8 @@ func declareExchange(
func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, ac telegraf.Accumulator) {
a.deliveries = make(map[telegraf.TrackingID]amqp.Delivery)

acc := ac.WithTracking(a.PrefetchCount)
sem := make(semaphore, a.PrefetchCount)
acc := ac.WithTracking(a.MaxUndeliveredMessages)
sem := make(semaphore, a.MaxUndeliveredMessages)

for {
select {
Expand Down Expand Up @@ -463,12 +478,13 @@ func (a *AMQPConsumer) Stop() {
func init() {
inputs.Add("amqp_consumer", func() telegraf.Input {
return &AMQPConsumer{
URL: DefaultBroker,
AuthMethod: DefaultAuthMethod,
ExchangeType: DefaultExchangeType,
ExchangeDurability: DefaultExchangeDurability,
QueueDurability: DefaultQueueDurability,
PrefetchCount: DefaultPrefetchCount,
URL: DefaultBroker,
AuthMethod: DefaultAuthMethod,
ExchangeType: DefaultExchangeType,
ExchangeDurability: DefaultExchangeDurability,
QueueDurability: DefaultQueueDurability,
PrefetchCount: DefaultPrefetchCount,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
})
}
37 changes: 21 additions & 16 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
# Kafka Consumer Input Plugin

The [Kafka](http://kafka.apache.org/) consumer plugin polls a specified Kafka
topic and adds messages to InfluxDB. The plugin assumes messages follow the
line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/consumergroup)
is used to talk to the Kafka cluster so multiple instances of telegraf can read
from the same topic in parallel.
The [Kafka][kafka] consumer plugin reads from Kafka
and creates metrics using one of the supported [input data formats][].

For old kafka version (< 0.8), please use the kafka_consumer_legacy input plugin
For old kafka version (< 0.8), please use the [kafka_consumer_legacy][] input plugin
and use the old zookeeper connection method.

## Configuration
### Configuration

```toml
# Read metrics from Kafka topic(s)
[[inputs.kafka_consumer]]
## kafka servers
brokers = ["localhost:9092"]
Expand Down Expand Up @@ -44,18 +40,27 @@ and use the old zookeeper connection method.
## Offset (must be either "oldest" or "newest")
offset = "oldest"

## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000

## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
```

## Testing

Running integration tests requires running Zookeeper & Kafka. See Makefile
for kafka container command.
[kafka]: https://kafka.apache.org
[kafka_consumer_legacy]: /plugins/inputs/kafka_consumer_legacy/README.md
[input data formats]: /docs/DATA_FORMATS_INPUT.md
45 changes: 27 additions & 18 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

const (
defaultMaxUnmarkedMessages = 1000
defaultMaxUndeliveredMessages = 1000
)

type empty struct{}
Expand All @@ -30,16 +30,16 @@ type Consumer interface {
}

type Kafka struct {
ConsumerGroup string `toml:"consumer_group"`
ClientID string `toml:"client_id"`
Topics []string `toml:"topics"`
Brokers []string `toml:"brokers"`
MaxMessageLen int `toml:"max_message_len"`
Version string `toml:"version"`
MaxUnmarkedMessages int `toml:"max_unmarked_messages"`
Offset string `toml:"offset"`
SASLUsername string `toml:"sasl_username"`
SASLPassword string `toml:"sasl_password"`
ConsumerGroup string `toml:"consumer_group"`
ClientID string `toml:"client_id"`
Topics []string `toml:"topics"`
Brokers []string `toml:"brokers"`
MaxMessageLen int `toml:"max_message_len"`
Version string `toml:"version"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Offset string `toml:"offset"`
SASLUsername string `toml:"sasl_username"`
SASLPassword string `toml:"sasl_password"`
tls.ClientConfig

cluster Consumer
Expand Down Expand Up @@ -85,16 +85,25 @@ var sampleConfig = `
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
offset = "oldest"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
`

func (k *Kafka) SampleConfig() string {
Expand Down Expand Up @@ -195,8 +204,8 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error {
func (k *Kafka) receiver(ctx context.Context, ac telegraf.Accumulator) {
k.messages = make(map[telegraf.TrackingID]*sarama.ConsumerMessage)

acc := ac.WithTracking(k.MaxUnmarkedMessages)
sem := make(semaphore, k.MaxUnmarkedMessages)
acc := ac.WithTracking(k.MaxUndeliveredMessages)
sem := make(semaphore, k.MaxUndeliveredMessages)

for {
select {
Expand Down Expand Up @@ -282,7 +291,7 @@ func (k *Kafka) Gather(acc telegraf.Accumulator) error {
func init() {
inputs.Add("kafka_consumer", func() telegraf.Input {
return &Kafka{
MaxUnmarkedMessages: defaultMaxUnmarkedMessages,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
})
}
19 changes: 9 additions & 10 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"strings"
"testing"

"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -50,14 +49,14 @@ func newTestKafka() (*Kafka, *TestConsumer) {
messages: make(chan *sarama.ConsumerMessage, 1000),
}
k := Kafka{
cluster: consumer,
ConsumerGroup: "test",
Topics: []string{"telegraf"},
Brokers: []string{"localhost:9092"},
Offset: "oldest",
MaxUnmarkedMessages: defaultMaxUnmarkedMessages,
doNotCommitMsgs: true,
messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage),
cluster: consumer,
ConsumerGroup: "test",
Topics: []string{"telegraf"},
Brokers: []string{"localhost:9092"},
Offset: "oldest",
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
doNotCommitMsgs: true,
messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage),
}
return &k, consumer
}
Expand Down
22 changes: 14 additions & 8 deletions plugins/inputs/mqtt_consumer/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# MQTT Consumer Input Plugin

The [MQTT](http://mqtt.org/) consumer plugin reads from
specified MQTT topics and adds messages to InfluxDB.
The plugin expects messages in the
[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
The [MQTT][mqtt] consumer plugin reads from the specified MQTT topics
and creates metrics using one of the supported [input data formats][].

### Configuration:

```toml
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
Expand All @@ -26,9 +23,15 @@ The plugin expects messages in the
## Connection timeout for initial connection in seconds
connection_timeout = "30s"

## Max messages to read from the broker that have not been written by an
## output.
# max_messages_in_flight = 1000
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000

## Topics to subscribe to
topics = [
Expand Down Expand Up @@ -66,3 +69,6 @@ The plugin expects messages in the

- All measurements are tagged with the incoming topic, ie
`topic=telegraf/host01/cpu`

[mqtt]: https://mqtt.org
[input data formats]: /docs/DATA_FORMATS_INPUT.md
Loading

0 comments on commit b545796

Please sign in to comment.