From 5d9a40aa204c70e5ef067c59b592299a2cf07235 Mon Sep 17 00:00:00 2001 From: Dustin Frisch Date: Tue, 12 Mar 2024 22:19:09 +0100 Subject: [PATCH] MQTT reconnect --- interface-mqtt/src/lib.rs | 55 +++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/interface-mqtt/src/lib.rs b/interface-mqtt/src/lib.rs index 6ced1fb..9478a3d 100644 --- a/interface-mqtt/src/lib.rs +++ b/interface-mqtt/src/lib.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use anyhow::Result; use bytes::Bytes; @@ -27,7 +28,10 @@ pub struct MQTT<'s> { impl <'s> MQTT<'s> { pub fn new(url: impl Into) -> Result { - let mqtt_options = MqttOptions::parse_url(url)?; + let mut mqtt_options = MqttOptions::parse_url(url)?; + mqtt_options.set_keep_alive(Duration::from_secs(5)); + mqtt_options.set_clean_session(true); + return Ok(Self { mqtt_options, realm: "photonic", // TODO: Extract realm from URL @@ -62,32 +66,45 @@ impl Interface for MQTT<'_> { for input in introspection.inputs.values() { let topic = realm.topic(format!("input/{}/set", input.name)); - client.subscribe(&topic, QoS::AtLeastOnce).await?; - - eprintln!("⇄ Subscribed to '{}' for input '{}' with type {}", topic, input.name, input.value_type); - topics.insert(topic, input); } while let Ok(event) = event_loop.poll().await { - if let Event::Incoming(Incoming::Publish(publish)) = event { - let input = topics.get(&publish.topic).expect("Got notification for unknown topic"); - - let payload = match String::from_utf8(publish.payload.to_vec()) { - Ok(payload) => payload, - Err(err) => { - eprintln!("⇄ Invalid value on '{}' = {:?}: {}", publish.topic, publish.payload, err); - continue; + match event { + Event::Incoming(Incoming::Connect(_)) => { + for (topic, input) in &topics { + client.subscribe(topic.to_owned(), QoS::AtLeastOnce).await?; + eprintln!("⇄ Subscribed to '{}' for input '{}' with type {}", topic, input.name, input.value_type); } - }; + } - match input.sink.send_str(&payload) { - Ok(()) => {} - Err(err) => { - eprintln!("⇄ Invalid value on '{}' = {:?}: {}", publish.topic, payload, err); - continue; + Event::Incoming(Incoming::Publish(publish)) => { + let input = match topics.get(&publish.topic) { + Some(input) => input, + None => { + eprintln!("Got notification for unknown topic: {}", publish.topic); + continue; + } + }; + + let payload = match String::from_utf8(publish.payload.to_vec()) { + Ok(payload) => payload, + Err(err) => { + eprintln!("⇄ Invalid value on '{}' = {:?}: {}", publish.topic, publish.payload, err); + continue; + } + }; + + match input.sink.send_str(&payload) { + Ok(()) => {} + Err(err) => { + eprintln!("⇄ Invalid value on '{}' = {:?}: {}", publish.topic, payload, err); + continue; + } } } + + _ => {} } }