Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT consumer plugin disconnects frequently and can not reconnect successfully #16293

Open
tom-ch1 opened this issue Dec 10, 2024 · 4 comments
Open
Assignees
Labels
bug unexpected problem or unintended behavior

Comments

@tom-ch1
Copy link

tom-ch1 commented Dec 10, 2024

Relevant telegraf.conf

[[inputs.mqtt_consumer]]
topics = [
    "myTenant/#",
  ]
  data_format= "value"
  data_type= "string"

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "+/devices/+/+"
    measurement = "measurement/_/_/_"
    tags = "tenant/_/device/field"

[[processors.pivot]]
  order = 1
  tag_key = "field"
  value_key = "value"

[[processors.converter]]
  order = 2
  tagpass = { topic = ["myTenant*"] }
  [processors.converter.fields]
    float = ["temperature", "humidity"]
    integer = ["pressure", "rssi"]

Logs from Telegraf

2024-12-08T22:49:30Z D! [inputs.mqtt_consumer] [pinger]  ping check60.000303436
2024-12-08T22:49:30Z D! [inputs.mqtt_consumer] [pinger]  keepalive sending ping
2024-12-08T22:49:34Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics
2024-12-08T22:49:35Z D! [inputs.mqtt_consumer] [pinger]  ping check4.9995263
2024-12-08T22:49:40Z D! [inputs.mqtt_consumer] [pinger]  ping check10.00021076
2024-12-08T22:49:40Z D! [inputs.mqtt_consumer] [pinger]  pingresp not received, disconnecting
2024-12-08T22:49:40Z D! [inputs.mqtt_consumer] [client]  internalConnLost called
2024-12-08T22:49:40Z D! [inputs.mqtt_consumer] [client]  stopCommsWorkers called
2024-12-08T22:49:40Z D! [inputs.mqtt_consumer] [client]  internalConnLost waiting on workers
2024-12-08T22:49:40Z D! [inputs.mqtt_consumer] [client]  stopCommsWorkers waiting for workers
2024-12-08T22:49:44Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] Connecting [tcp://127.0.0.1:1883]
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client]  Connect()
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [store]   memorystore initialized
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client]  about to write new connect msg
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client]  socket connected to broker
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client]  Using MQTT 3.1.1 protocol
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     connect started
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     received connack
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client]  startCommsWorkers called
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client]  client is connected/reconnected
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     incoming started
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms started
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     outgoing started
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startComms started
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client]  startCommsWorkers done
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [store]   memorystore wiped
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client]  exit startClient
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [pinger]  keepalive starting
2024-12-08T22:49:50Z I! [inputs.mqtt_consumer] Connected [tcp://127.0.0.1:1883]
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     outgoing waiting for an outbound message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: inboundFromStore complete
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client]  enter SubscribeMultiple
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client]  sending subscribe message, topics:[topic1/# topic2/#]
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client]  exit SubscribeMultiple
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     obound priority msg to write, type*packets.SubscribePacket
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     outgoing waiting for an outbound message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [store]   memorystore del: message1not found
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: received suback, id:1
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: granted qoss[0 0]
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:0
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:0
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:0
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:0
2024-12-08T22:49:54Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics
2024-12-08T22:49:55Z D! [inputs.mqtt_consumer] [pinger]  ping check5.000816128
2024-12-08T22:50:00Z D! [inputs.mqtt_consumer] [pinger]  ping check10.000605322
2024-12-08T22:50:04Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics

etc (goes on forever like this)

System info

Telegraf 1.32.3 (git: HEAD@2fd5bf4f) on Debian 12 (bookworm)

  • Bug introduced with Telegraf v1.32.0
  • Bug still present in Telegraf v1.33.0

Docker

not using docker

Steps to reproduce

  1. start mosquitto, influxdb and telegraf
  2. publish some data

Expected behavior

  1. Data is read and transformed by telegraf and stored in influxdb
  2. if connection to mosquitto is lost, telegraf reconnects and continues to process data

Actual behavior

  1. Data is read and transformed by telegraf and stored in influxdb
  2. if connection to mosquitto is lost, telegraf reconnects successfully but fails to receive any data (including ping resp)
@tom-ch1 tom-ch1 added the bug unexpected problem or unintended behavior label Dec 10, 2024
@cortlieb
Copy link

cortlieb commented Dec 12, 2024

I can confirm the described behavior (at least what I understood is meant). Here is a post I published in the Influx community forum, maybe it can add some insights.

I’m using the inputs.mqtt_consumer plugin to retrieve some data from a LoRaWAN networkserver (TTN). I noticed recently that no more data where written into my InfluxDB.
I restarted telegraf and data were written again.

I looked into telegraf’s log-files and found error entries for all concerned mqtt_consumer instances, e.g.

E! [inputs.mqtt_consumer::ttn_consumer_ow] Error in plugin: connection lost: read tcp 172.19.0.3:42600->52.212.223.226:1883: read: connection reset by peer
E! [inputs.mqtt_consumer::ttn_consumer_ow] Error in plugin: network Error : read tcp 172.19.0.3:33402->63.34.215.128:1883: i/o timeout

I guess it can happen that the connection between the server running telegraf and the mqtt-server is temporarily interrupted. But why does that cause the telgraf input plugins to quit working?

Here are some more observations:

  • telegraf is running in a docker container
  • other input- and output plugins of the same telegraf instance continued working without issue
  • As you can see in the example above it seemed that the IP address for the configured mqtt-server changed.
    Maybe that’s just a normal process (e.g. for something like load-balancing?).
    Maybe that causes the telegraf issue?
    On the other hand there are also error entries in the log that have the same IP address for both error messages (read: connection reset by peer / i/o timeout).
  • when I restarted the telegraf container data started immediately to flow again from the mqtt-server to the InfluxDB

@tom-ch1
Copy link
Author

tom-ch1 commented Dec 13, 2024

This is not the same issue please file a separate request.

E! [inputs.mqtt_consumer::ttn_consumer_ow] Error in plugin: connection lost: read tcp 172.19.0.3:42600->52.212.223.226:1883: read: connection reset by peer
E! [inputs.mqtt_consumer::ttn_consumer_ow] Error in plugin: network Error : read tcp 172.19.0.3:33402->63.34.215.128:1883: i/o timeout

I do not have a timeout issue, as you can see im my log, the reconnection is successful:

2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client]  client is connected/reconnected
...
2024-12-08T22:49:50Z I! [inputs.mqtt_consumer] Connected [tcp://127.0.0.1:1883]

This bug is that after the successful reconnection:

  • messages are not received / processed
  • after sending a ping, the pingresp is not received / processed

I think the bug was triggered after an upgrade from telegraf 1.32.0-1 to 1.32.3-1 (or from 1.31.2-1, 1.32.0-1). I'll try to downgrade to 1.32.0-1 and check if I hit this bug

@tom-ch1
Copy link
Author

tom-ch1 commented Dec 13, 2024

Is it possibly a bug in github.com/eclipse/paho.mqtt.golang v1.5.0 ?
I see in mosquitto logs that sometimes, the connection only lasts for 10 seconds, so long before the connection timeout.
I don't understand the implication of eclipse-paho/paho.mqtt.golang@6801721 , could this have anything to do with it?

@tom-ch1
Copy link
Author

tom-ch1 commented Dec 14, 2024

I can confirm that the bug is:

I suspect it's either the paho.mqtt.golang dependency update or #15486. Less likely #15528, for this would probably not cause the ping resp to be missed.

WORKAROUND: downgrade to v1.31.3 and prevent any upgrades until this is fixed.

@srebhan srebhan self-assigned this Dec 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug unexpected problem or unintended behavior
Projects
None yet
Development

No branches or pull requests

3 participants