Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amqp consumer connection fail #13746

Closed
matteoberts opened this issue Aug 10, 2023 · 5 comments · Fixed by #15145
Closed

Amqp consumer connection fail #13746

matteoberts opened this issue Aug 10, 2023 · 5 comments · Fixed by #15145
Labels
bug unexpected problem or unintended behavior help wanted Request for community participation, code, contribution size/m 2-4 day effort

Comments

@matteoberts
Copy link

matteoberts commented Aug 10, 2023

Relevant telegraf.conf

...
###############################################################################
#                            INPUT PLUGINS                                    #
###############################################################################

[[inputs.amqp_consumer]]
  ## Broker to consume from.
  ##   deprecated in 1.7; use the brokers option
  # url = "amqp://localhost:5672/influxdb"

  ## Brokers to consume from.  If multiple brokers are specified a random broker
  ## will be selected anytime a connection is established.  This can be
  ## helpful for load balancing when not using a dedicated load balancer.
  brokers = ["amqp://localhost:5672"]

  ## Authentication credentials for the PLAIN auth_method.
  # username = ""
  # password = ""

  ## Name of the exchange to declare.  If unset, no exchange will be declared.
  exchange = "telegraf"

  ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
  # exchange_type = "topic"

  ## If true, exchange will be passively declared.
  # exchange_passive = false

  ## Exchange durability can be either "transient" or "durable".
  # exchange_durability = "durable"

  ## Additional exchange arguments.
  # exchange_arguments = { }
  # exchange_arguments = {"hash_property" = "timestamp"}

  ## AMQP queue name.
  queue = "sensorsmq"

  ## AMQP queue durability can be "transient" or "durable".
  queue_durability = "durable"

  ## If true, queue will be passively declared.
  # queue_passive = false

  ## A binding between the exchange and queue using this binding key is
  ## created.  If unset, no binding is created.
  binding_key = "#"

  ## Maximum number of messages server should give to the worker.
  prefetch_count = 1000

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Auth method. PLAIN and EXTERNAL are supported
  ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
  ## described here: https://www.rabbitmq.com/plugins.html
  # auth_method = "PLAIN"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Content encoding for message payloads, can be set to "gzip" to or
  ## "identity" to apply no encoding.
  # content_encoding = "identity"

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"
...

Logs from Telegraf

2023-08-09T20:53:22Z D! [agent] Starting service inputs
2023-08-09T20:53:22Z D! [inputs.amqp_consumer] Connecting to "amqp://localhost:5672"
2023-08-09T20:53:23Z D! [inputs.amqp_consumer] Error connecting to "amqp://localhost:5672"

System info

Telegraf 1.26.3, Windows Server 2019

Docker

No response

Steps to reproduce

  1. System reboot
  2. Telegraf start faster than RabbitMQ
  3. Telegraf can't connect to RabbitMQ -> error logs:
    2023-08-09T20:53:22Z D! [agent] Starting service inputs
    2023-08-09T20:53:22Z D! [inputs.amqp_consumer] Connecting to "amqp://localhost:5672"
    2023-08-09T20:53:23Z D! [inputs.amqp_consumer] Error connecting to "amqp://localhost:5672"
  4. RabbitMQ started meanwhile
  5. Telegraf does not retry to connect, messages are not processed
    ...

Expected behavior

In case of Telegraf fails to connect to AMQP broker, it should perform some retries

Actual behavior

No connection retry is performed based on the error described. A manual restart of Telegraf has to be performed in order to re-establish the connection

Additional info

I can't find a configuration for connection retries/backoff based on described error scenario. In case of Telegraf can't establish a connection to RabbitMQ via the input amqp_consumer plugin there should be a way to perform retries.

@matteoberts matteoberts added the bug unexpected problem or unintended behavior label Aug 10, 2023
@powersj
Copy link
Contributor

powersj commented Aug 10, 2023

Hi,

Happy to see a PR which adds a config option added to the plugin that enables retries, such as:

  ## Specifies plugin behavior regarding disconnected servers
  ## Available choices :
  ##   - error: telegraf will return an error on startup if one the servers is unreachable
  ##   - ignore: telegraf will ignore unreachable servers on both startup and gather
  # disconnected_servers_behavior = "error"

@powersj powersj added help wanted Request for community participation, code, contribution size/m 2-4 day effort labels Aug 10, 2023
@matteoberts
Copy link
Author

Hi @powersj,

Thanks a lot for the feedback! I'm glad to see the addition of the new config option to the plugin, it's a great enhancement.

I had a question regarding the PR: will there be a connection retry mechanism included to handle the scenario we discussed? Or is there a different approach planned for handling it? I'm curious to know how this will be addressed.

Looking forward to hearing more about it. Thanks again for your work on this!

Regards,
Matteo

@powersj
Copy link
Contributor

powersj commented Aug 10, 2023

Yes - whoever adds this could add another option like "retry" where the plugin could retry the connection at each interval.

@srebhan
Copy link
Member

srebhan commented Apr 12, 2024

@matteoberts could you please try the binary in PR #15145 with startup_error_behavior = "retry" and let me know if this is the behavior you want!?!?

@matteoberts
Copy link
Author

@matteoberts could you please try the binary in PR #15145 with startup_error_behavior = "retry" and let me know if this is the behavior you want!?!?

Hi @srebhan ,

Sorry for the late reply. I've tested it and it worked pretty well. Thank you very much for the support!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug unexpected problem or unintended behavior help wanted Request for community participation, code, contribution size/m 2-4 day effort
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants