Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[amqp output] Allow external auth (cert-based tls auth) #863

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func GetTLSConfig(
RootCAs: caCertPool,
InsecureSkipVerify: InsecureSkipVerify,
}
t.BuildNameToCertificate()
} else {
if InsecureSkipVerify {
t.InsecureSkipVerify = true
Expand Down
32 changes: 28 additions & 4 deletions plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"log"
"strings"
"sync"
"time"

Expand All @@ -20,6 +21,8 @@ type AMQP struct {
URL string
// AMQP exchange
Exchange string
// AMQP Auth method
AuthMethod string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// InfluxDB database
Expand All @@ -45,7 +48,17 @@ type AMQP struct {
serializer serializers.Serializer
}

type externalAuth struct{}

func (a *externalAuth) Mechanism() string {
return "EXTERNAL"
}
func (a *externalAuth) Response() string {
return fmt.Sprintf("\000")
}

const (
DefaultAuthMethod = "PLAIN"
DefaultRetentionPolicy = "default"
DefaultDatabase = "telegraf"
DefaultPrecision = "s"
Expand All @@ -56,6 +69,8 @@ var sampleConfig = `
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
exchange = "telegraf"
## Auth method. PLAIN and EXTERNAL are supported
# auth_method = "PLAIN"
## Telegraf tag to use as a routing key
## ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
Expand Down Expand Up @@ -103,11 +118,19 @@ func (q *AMQP) Connect() error {
return err
}

if tls != nil {
connection, err = amqp.DialTLS(q.URL, tls)
} else {
connection, err = amqp.Dial(q.URL)
// parse auth method
var sasl []amqp.Authentication // nil by default

if strings.ToUpper(q.AuthMethod) == "EXTERNAL" {
sasl = []amqp.Authentication{&externalAuth{}}
}

amqpConf := amqp.Config{
TLSClientConfig: tls,
SASL: sasl, // if nil, it will be PLAIN
}

connection, err = amqp.DialConfig(q.URL, amqpConf)
if err != nil {
return err
}
Expand Down Expand Up @@ -200,6 +223,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
func init() {
outputs.Add("amqp", func() telegraf.Output {
return &AMQP{
AuthMethod: DefaultAuthMethod,
Database: DefaultDatabase,
Precision: DefaultPrecision,
RetentionPolicy: DefaultRetentionPolicy,
Expand Down