Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework mqtt_consumer connect/reconnect #4846

Merged
merged 3 commits into from
Oct 15, 2018
Merged

Rework mqtt_consumer connect/reconnect #4846

merged 3 commits into from
Oct 15, 2018

Conversation

danielnelson
Copy link
Contributor

Reworks the connection lifetime for the mqtt_consumer plugin. For now I have this marked for 1.9.0 but if we can get some help testing we can consider it for 1.8.x.

closes #4594
closes #4580
closes #4731

Based on #4814

Required for all PRs:

  • Signed CLA.
  • Associated README.md updated.
  • Has appropriate unit tests.

@danielnelson danielnelson added this to the 1.9.0 milestone Oct 10, 2018
@trankennykhang
Copy link

@danielnelson sorry for slow action. It is the first time I contribute to community and just learned golang few weeks ago so I need a bit of time to check one by one carefully

Your changes is pretty good. Here is my comment:

  • Three connection state will prevent the connection to establish second time completely. My change only tried to set it asap.
  • Get rid of "receive" thread function to reduce the complexion
  • I feel not right at the first look "opts.SetAutoReconnect(false)" but it is clear after checking further. The "SetConnectionLostHandler" will set the state back to original then the next call of "gather" function will try to establish it again.

I will test it on our IoTs project to see how it is. Thanks

@Dees7
Copy link

Dees7 commented Oct 11, 2018

Please make armhf build for RaspberryPi3

@Dees7
Copy link

Dees7 commented Oct 11, 2018

There is to output at all.
telegraftest.conf :

[global_tags]
[agent]
  interval = "5s"
  round_interval = true
  metric_batch_size = 500
  metric_buffer_limit = 1000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  debug = true
  quiet = false
  logfile = ""
  hostname = ""
  omit_hostname = true
[[processors.converter]]
  [processors.converter.tags]
    float = ["POWER*"]
[[processors.printer]]
  order = 2
[[processors.regex]]
  order = 1
  [[processors.regex.tags]]
      key = "topic"
      pattern = ".*/(.*)/.*"
      replacement = "${1}"
      result_key = "topicname"
[[inputs.mqtt_consumer]]
  servers = ["tcp://localhost:1883"]
  qos = 0
  connection_timeout = "4s"
  topics = ["test/#"]
  persistent_session = true
  client_id = "telegraftest"
  username = "user"
  password = "pass"
  name_override = "mqttc"
  tag_keys = ["hostname","POWER","POWER1"]
  data_format = "json"
[[outputs.file]]
   files = ["stdout"]
   data_format = "graphite"
[[outputs.influxdb]]
  urls = ["http://127.0.0.1:8086"]
  database = "test"
  skip_database_creation = false
  retention_policy = ""
  write_consistency = "any"
  timeout = "5s"
  username = "user"
  password = "pss"

/home/pi/telegraf --debug -config /etc/telegraf/telegraftest.conf

2018-10-11T06:51:26Z D! Attempting connection to output: influxdb
2018-10-11T06:51:26Z D! Successfully connected to output: influxdb
2018-10-11T06:51:26Z D! Attempting connection to output: file
2018-10-11T06:51:26Z D! Successfully connected to output: file
2018-10-11T06:51:26Z I! Starting Telegraf 
2018-10-11T06:51:26Z I! Loaded inputs: inputs.mqtt_consumer
2018-10-11T06:51:26Z I! Loaded aggregators: 
2018-10-11T06:51:26Z I! Loaded processors: converter regex printer
2018-10-11T06:51:26Z I! Loaded outputs: influxdb file
2018-10-11T06:51:26Z I! Tags enabled: 
2018-10-11T06:51:26Z I! Agent Config: Interval:5s, Quiet:false, Hostname:"", Flush Interval:10s 
2018-10-11T06:51:30Z I! [inputs.mqtt_consumer]: connected [tcp://localhost:1883]
2018-10-11T06:51:40Z D! Output [file] buffer fullness: 0 / 1000 metrics. 
2018-10-11T06:51:40Z D! Output [influxdb] buffer fullness: 0 / 1000 metrics. 
2018-10-11T06:51:50Z D! Output [influxdb] buffer fullness: 0 / 1000 metrics. 
2018-10-11T06:51:50Z D! Output [file] buffer fullness: 0 / 1000 metrics. 
^C2018-10-11T06:51:57Z I! Hang on, flushing any cached metrics before shutdown
2018-10-11T06:51:57Z D! Output [file] buffer fullness: 0 / 1000 metrics. 
2018-10-11T06:51:57Z D! Output [influxdb] buffer fullness: 0 / 1000 metrics. 
2018-10-11T06:51:57Z D! [inputs.mqtt_consumer]: disconnecting [tcp://localhost:1883]
2018-10-11T06:51:57Z D! [inputs.mqtt_consumer]: disconnected [tcp://localhost:1883]

publishing:

mosquitto_pub -u 'user' -P 'pass' -t 'test/sonoff/RESULT' -m '{"POWER":0,"TEMP":41.1,"hostname":"mine"}'
mosquitto_pub -u 'user' -P 'pass' -t 'test/sonoff/RESULT' -m '{"POWER":0,"TEMP":41.1,"hostname":"mine"}'

debug with mosquitto_sub -u 'user' -P 'pass' -v -t 'test/#':

test/sonoff/RESULT {"POWER":0,"TEMP":41.1,"hostname":"mine"}                                       
test/sonoff/RESULT {"POWER":0,"TEMP":41.1,"hostname":"mine"}  

mosquitto.log

Thu 11 Oct 11:51:30 +05 2018 1539240690: Sending CONNACK to telegraftest (1, 0)
Thu 11 Oct 11:51:34 +05 2018 1539240694: Sending PUBLISH to telegraftest (d0, q0, r0, m0, 'test/sonoff/RESULT', ... (41 bytes))
Thu 11 Oct 11:51:47 +05 2018 1539240707: Sending PUBLISH to telegraftest (d0, q0, r0, m0, 'test/sonoff/RESULT', ... (41 bytes))
Thu 11 Oct 11:51:57 +05 2018 1539240717: Received DISCONNECT from telegraftest

@chumbert2
Copy link

Using telegraf_1.9.0~dcf4fb4b-0_amd64.deb the problem of double connection we had with 1.8.0 and 1.8.1 is not present. MQTT messages are correctly received. (we haven't tested reconnect yet).

@Dees7
Copy link

Dees7 commented Oct 11, 2018

Yes. Now works stable (after restarting telegraf or mosquitto).

@chumbert2
Copy link

Works as expected with telegraf_1.9.0~2cc071d4-0_amd64.deb:

  • messages are properly received
  • no double connection
  • reconnection OK

NOTE: tested with persistent_session = false only (AWS broker does not support persistent sessions).

@Flowm
Copy link

Flowm commented Oct 12, 2018

Works perfectly so far on a RPi3 with telegraf_1.9.0%7E2cc071d4-0_armhf.deb
Thanks a lot!

@ErikAndren
Copy link

telegraf_1.9.0%7E2cc071d4-0_armhf.deb works for me on a raspberry pi 1 using debian stretch

@danielnelson danielnelson modified the milestones: 1.9.0, 1.8.2 Oct 15, 2018
@danielnelson danielnelson merged commit 152365a into master Oct 15, 2018
@danielnelson danielnelson deleted the mqtt-connect branch October 15, 2018 20:03
danielnelson added a commit that referenced this pull request Oct 15, 2018
rgitzel pushed a commit to rgitzel/telegraf that referenced this pull request Oct 17, 2018
otherpirate pushed a commit to otherpirate/telegraf that referenced this pull request Mar 15, 2019
otherpirate pushed a commit to otherpirate/telegraf that referenced this pull request Mar 15, 2019
dupondje pushed a commit to dupondje/telegraf that referenced this pull request Apr 22, 2019
athoune pushed a commit to bearstech/telegraf that referenced this pull request Apr 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/mqtt fix pr to fix corresponding bug
Projects
None yet
7 participants