Skip to content

Commit

Permalink
MQTT reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
fooker committed Mar 12, 2024
1 parent aaf94fe commit 5d9a40a
Showing 1 changed file with 36 additions and 19 deletions.
55 changes: 36 additions & 19 deletions interface-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use bytes::Bytes;
Expand Down Expand Up @@ -27,7 +28,10 @@ pub struct MQTT<'s> {

impl <'s> MQTT<'s> {
pub fn new(url: impl Into<String>) -> Result<Self> {
let mqtt_options = MqttOptions::parse_url(url)?;
let mut mqtt_options = MqttOptions::parse_url(url)?;
mqtt_options.set_keep_alive(Duration::from_secs(5));
mqtt_options.set_clean_session(true);

return Ok(Self {
mqtt_options,
realm: "photonic", // TODO: Extract realm from URL
Expand Down Expand Up @@ -62,32 +66,45 @@ impl Interface for MQTT<'_> {

for input in introspection.inputs.values() {
let topic = realm.topic(format!("input/{}/set", input.name));
client.subscribe(&topic, QoS::AtLeastOnce).await?;

eprintln!("⇄ Subscribed to '{}' for input '{}' with type {}", topic, input.name, input.value_type);

topics.insert(topic, input);
}

while let Ok(event) = event_loop.poll().await {
if let Event::Incoming(Incoming::Publish(publish)) = event {
let input = topics.get(&publish.topic).expect("Got notification for unknown topic");

let payload = match String::from_utf8(publish.payload.to_vec()) {
Ok(payload) => payload,
Err(err) => {
eprintln!("⇄ Invalid value on '{}' = {:?}: {}", publish.topic, publish.payload, err);
continue;
match event {
Event::Incoming(Incoming::Connect(_)) => {
for (topic, input) in &topics {
client.subscribe(topic.to_owned(), QoS::AtLeastOnce).await?;
eprintln!("⇄ Subscribed to '{}' for input '{}' with type {}", topic, input.name, input.value_type);
}
};
}

match input.sink.send_str(&payload) {
Ok(()) => {}
Err(err) => {
eprintln!("⇄ Invalid value on '{}' = {:?}: {}", publish.topic, payload, err);
continue;
Event::Incoming(Incoming::Publish(publish)) => {
let input = match topics.get(&publish.topic) {
Some(input) => input,
None => {
eprintln!("Got notification for unknown topic: {}", publish.topic);
continue;
}
};

let payload = match String::from_utf8(publish.payload.to_vec()) {
Ok(payload) => payload,
Err(err) => {
eprintln!("⇄ Invalid value on '{}' = {:?}: {}", publish.topic, publish.payload, err);
continue;
}
};

match input.sink.send_str(&payload) {
Ok(()) => {}
Err(err) => {
eprintln!("⇄ Invalid value on '{}' = {:?}: {}", publish.topic, payload, err);
continue;
}
}
}

_ => {}
}
}

Expand Down

0 comments on commit 5d9a40a

Please sign in to comment.