diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md index 320df2d79e9ae..df235543b38b7 100644 --- a/plugins/outputs/amqp/README.md +++ b/plugins/outputs/amqp/README.md @@ -33,10 +33,6 @@ to use them. ```toml @sample.conf # Publishes metrics to an AMQP broker [[outputs.amqp]] - ## Broker to publish to. - ## deprecated in 1.7; use the brokers option - # url = "amqp://localhost:5672/influxdb" - ## Brokers to publish to. If multiple brokers are specified a random broker ## will be selected anytime a connection is established. This can be ## helpful for load balancing when not using a dedicated load balancer. @@ -85,14 +81,6 @@ to use them. ## One of "transient" or "persistent". # delivery_mode = "transient" - ## InfluxDB database added as a message header. - ## deprecated in 1.7; use the headers option - # database = "telegraf" - - ## InfluxDB retention policy added as a message header - ## deprecated in 1.7; use the headers option - # retention_policy = "default" - ## Static headers added to each published message. # headers = { } # headers = {"database" = "telegraf", "retention_policy" = "default"} diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index ac26853cee3c5..b41fe32a216a9 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -88,22 +88,14 @@ func (q *AMQP) SetSerializer(serializer serializers.Serializer) { q.serializer = serializer } -func (q *AMQP) Connect() error { - if q.config == nil { - clientConfig, err := q.makeClientConfig() - if err != nil { - return err - } - q.config = clientConfig - } - +func (q *AMQP) Init() error { var err error - q.encoder, err = internal.NewContentEncoder(q.ContentEncoding) + q.config, err = q.makeClientConfig() if err != nil { return err } - q.client, err = q.connect(q.config) + q.encoder, err = internal.NewContentEncoder(q.ContentEncoding) if err != nil { return err } @@ -111,6 +103,12 @@ func (q *AMQP) Connect() error { return nil } +func (q *AMQP) Connect() error { + var err error + q.client, err = q.connect(q.config) + return err +} + func (q *AMQP) Close() error { if q.client != nil { return q.client.Close() @@ -323,13 +321,15 @@ func connect(clientConfig *ClientConfig) (Client, error) { func init() { outputs.Add("amqp", func() telegraf.Output { return &AMQP{ - URL: DefaultURL, - ExchangeType: DefaultExchangeType, - AuthMethod: DefaultAuthMethod, - Database: DefaultDatabase, - RetentionPolicy: DefaultRetentionPolicy, - Timeout: config.Duration(time.Second * 5), - connect: connect, + Brokers: []string{DefaultURL}, + ExchangeType: DefaultExchangeType, + AuthMethod: DefaultAuthMethod, + Headers: map[string]string{ + "database": DefaultDatabase, + "retention_policy": DefaultRetentionPolicy, + }, + Timeout: config.Duration(time.Second * 5), + connect: connect, } }) } diff --git a/plugins/outputs/amqp/amqp_test.go b/plugins/outputs/amqp/amqp_test.go index d1772e5efe5c9..0e0cc471bb2ab 100644 --- a/plugins/outputs/amqp/amqp_test.go +++ b/plugins/outputs/amqp/amqp_test.go @@ -52,9 +52,11 @@ func TestConnect(t *testing.T) { ExchangeType: DefaultExchangeType, ExchangeDurability: "durable", AuthMethod: DefaultAuthMethod, - Database: DefaultDatabase, - RetentionPolicy: DefaultRetentionPolicy, - Timeout: config.Duration(time.Second * 5), + Headers: map[string]string{ + "database": DefaultDatabase, + "retention_policy": DefaultRetentionPolicy, + }, + Timeout: config.Duration(time.Second * 5), connect: func(_ *ClientConfig) (Client, error) { return NewMockClient(), nil }, @@ -150,6 +152,7 @@ func TestConnect(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + require.NoError(t, tt.output.Init()) err := tt.output.Connect() tt.errFunc(t, tt.output, err) }) diff --git a/plugins/outputs/amqp/sample.conf b/plugins/outputs/amqp/sample.conf index 9649eb7540e63..c08b997eb1c50 100644 --- a/plugins/outputs/amqp/sample.conf +++ b/plugins/outputs/amqp/sample.conf @@ -1,9 +1,5 @@ # Publishes metrics to an AMQP broker [[outputs.amqp]] - ## Broker to publish to. - ## deprecated in 1.7; use the brokers option - # url = "amqp://localhost:5672/influxdb" - ## Brokers to publish to. If multiple brokers are specified a random broker ## will be selected anytime a connection is established. This can be ## helpful for load balancing when not using a dedicated load balancer. @@ -52,14 +48,6 @@ ## One of "transient" or "persistent". # delivery_mode = "transient" - ## InfluxDB database added as a message header. - ## deprecated in 1.7; use the headers option - # database = "telegraf" - - ## InfluxDB retention policy added as a message header - ## deprecated in 1.7; use the headers option - # retention_policy = "default" - ## Static headers added to each published message. # headers = { } # headers = {"database" = "telegraf", "retention_policy" = "default"}