Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added TLS & credentials configuration for NATS_consumer input plugin #6195

Merged
merged 2 commits into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions plugins/inputs/nats_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,22 @@ instances of telegraf can read from a NATS cluster in parallel.
[[inputs.nats_consumer]]
## urls of NATS servers
servers = ["nats://localhost:4222"]
## Use Transport Layer Security
secure = false
## subject(s) to consume
subjects = ["telegraf"]
## name a queue group
queue_group = "telegraf_consumers"

## Optional credentials
# username = ""
# password = ""

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536
Expand Down
40 changes: 37 additions & 3 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
nats "github.com/nats-io/go-nats"
Expand Down Expand Up @@ -34,7 +35,11 @@ type natsConsumer struct {
QueueGroup string `toml:"queue_group"`
Subjects []string `toml:"subjects"`
Servers []string `toml:"servers"`
Secure bool `toml:"secure"`
Username string `toml:"username"`
Password string `toml:"password"`
tls.ClientConfig
// Legacy; Should be deprecated
Secure bool `toml:"secure"`

// Client pending limits:
PendingMessageLimit int `toml:"pending_message_limit"`
Expand All @@ -61,13 +66,24 @@ type natsConsumer struct {
var sampleConfig = `
## urls of NATS servers
servers = ["nats://localhost:4222"]
## Use Transport Layer Security
## Deprecated: Use Transport Layer Security
secure = false
## subject(s) to consume
subjects = ["telegraf"]
## name a queue group
queue_group = "telegraf_consumers"

## Optional credentials
# username = ""
# password = ""

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536
Expand Down Expand Up @@ -125,7 +141,25 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
// override servers if any were specified
opts.Servers = n.Servers

opts.Secure = n.Secure
// override authentication, if any was specified
if n.Username != "" {
opts.User = n.Username
opts.Password = n.Password
}

// override TLS, if it was specified
tlsConfig, err := n.ClientConfig.TLSConfig()
if err != nil {
return err
}
if tlsConfig != nil {
// set NATS connection TLS options
opts.Secure = true
opts.TLSConfig = tlsConfig
} else {
// should be deprecated; use TLS
opts.Secure = n.Secure
}

if n.conn == nil || n.conn.IsClosed() {
n.conn, connectErr = opts.Connect()
Expand Down