Skip to content

Commit

Permalink
feat: Action messages published under /set for the robots works
Browse files Browse the repository at this point in the history
  • Loading branch information
Pingviinituutti committed Oct 30, 2023
1 parent 43e69dc commit 60eaef2
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 48 deletions.
14 changes: 8 additions & 6 deletions Settings.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
id = "neato-mqtt"

# Domain name / IP address and port of the MQTT broker
# host = "localhost" # in case of local broker
host = "mosquitto"
port = 1883
# host = "your_broker_address" # defaults to "localhost"
# port = 1883 # defaults to 1883

# MQTT topic where updates will be published
topic = "home/devices/neato/{id}"
# Uncomment if you want to change the topic
# topic = "home/devices/neato/{id}"

# received state to the devices
topic_set = "home/devices/neato/{id}/set"
# Uncomment if you want to change the set topic
# topic_set = "home/devices/neato/{id}/set"

[neato]
username = "your_email@address.com"
password = "password"
password = "password"
# dry_run = false # uncomment to enable dry run mode
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn main() -> Result<()> {
let settings = read_settings()?;
// let mqtt_client = mk_mqtt_client(&settings).await?;
let mqtt_client = mqtt::init(&settings.mqtt.clone()).await?;
let neato = Neato::new(mqtt_client, &settings.neato.clone())
let _neato = Neato::new(mqtt_client, &settings.neato.clone())
.init().await?;

tokio::signal::ctrl_c().await?;
Expand Down
75 changes: 58 additions & 17 deletions src/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,40 @@ use eyre::Result;
use rand::{distributions::Alphanumeric, Rng};
use rumqttc::{AsyncClient, MqttOptions, QoS};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use std::{time::Duration, sync::Arc};
use tokio::{sync::watch::Receiver, task};

use crate::settings::MqttSettings;
use log::{debug, error};

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub struct MqttDevice {
use crate::{settings::MqttSettings, neato::RobotCmd};

#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct MqttSetMessage {
pub action: RobotCmd,
}

#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct SendAction {
pub id: String,
pub name: Option<String>,
pub power: Option<bool>,
pub volume: Option<u16>,
pub mute: Option<bool>,
pub action: RobotCmd,
}

#[derive(Clone)]
pub struct MqttClient {
pub client: AsyncClient,
pub rx: Receiver<Option<MqttDevice>>,
pub rx: Receiver<Option<SendAction>>,
pub topic: String,
pub set_topic: String,
pub settings: MqttSettings,
}

pub fn get_id_from_topic(topic: &String, topic_set: &String) -> Result<String> {
if let Some((start, end)) = topic_set.split_once("{id}") {
Ok(topic.replace(start, "").replace(end, ""))
} else {
error!("Could not get id from topic: '{}'", topic);
Err(eyre::eyre!("Could not get id from topic: '{}'", topic))
}
}

pub async fn init(mqtt_settings: &MqttSettings) -> Result<MqttClient> {
Expand All @@ -37,28 +52,52 @@ pub async fn init(mqtt_settings: &MqttSettings) -> Result<MqttClient> {
);
options.set_keep_alive(Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(options, 10);
client
.subscribe(mqtt_settings.topic_set.clone(), QoS::AtMostOnce)
.await?;

let subscribe_settings = mqtt_settings.clone();
let subscribe_client = client.clone();

let config_clone = Arc::new(mqtt_settings.clone());

let (tx, rx) = tokio::sync::watch::channel(None);

task::spawn(async move {
loop {
let notification = eventloop.poll().await;

let id = subscribe_settings.id.clone();
let config_clone = Arc::clone(&config_clone);

let res = (|| async {
if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(msg)) = notification? {
let device: MqttDevice = serde_json::from_slice(&msg.payload)?;
tx.send(Some(device))?;
match notification? {
rumqttc::Event::Incoming(rumqttc::Packet::ConnAck(_)) => {
subscribe_client
.subscribe(config_clone.topic_set.replace("{id}", "+"), QoS::AtMostOnce)
.await?;
}
rumqttc::Event::Incoming(rumqttc::Packet::Publish(msg)) => {
debug!("Reveiced MQTT Publish for topic: {:?}", &msg.topic);
let id = get_id_from_topic(&msg.topic, &config_clone.topic_set)?;
debug!("Id is: {:?}", id);
debug!("Payload is: {:?}", &msg.payload);
let payload: MqttSetMessage = serde_json::from_slice(&msg.payload)?;
let device = {SendAction {
id: id,
action: payload.action,
}};
tx.send(Some(device))?;
}
_ => {}
}

Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
Ok::<(), Box<dyn std::error::Error + Sync + Send>>(())
})()
.await;

if let Err(e) = res {
eprintln!("MQTT error: {:?}", e);
error!(
target: &id.to_string(),
"MQTT error: {:?}", e
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
Expand All @@ -68,5 +107,7 @@ pub async fn init(mqtt_settings: &MqttSettings) -> Result<MqttClient> {
client,
rx,
topic: mqtt_settings.topic.clone(),
set_topic: mqtt_settings.topic_set.clone(),
settings: mqtt_settings.clone(),
})
}
100 changes: 77 additions & 23 deletions src/neato.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use sha2::Sha256;

use log::{debug, error, info};

use crate::neato_types::{HouseCleaningParams, NeatoState, PublicRobot, Robot, RobotMessage};
use crate::{neato_types::{HouseCleaningParams, NeatoState, PublicRobot, Robot, RobotMessage}, mqtt::SendAction};
use crate::{mqtt::MqttClient, settings::NeatoSettings};

impl Robot {
Expand Down Expand Up @@ -57,7 +57,7 @@ const BASE_URL: &str = "https://beehive.neatocloud.com";

type HmacSha256 = Hmac<Sha256>;

#[derive(PartialEq)]
#[derive(Clone, PartialEq, Deserialize, Serialize, Debug)]
pub enum RobotCmd {
StartCleaning,
StopCleaning,
Expand Down Expand Up @@ -122,7 +122,7 @@ impl Neato {
}

pub async fn init(mut self) -> color_eyre::Result<Neato> {
info!("Initializing Neato MQTT client");
info!("Initializing Neato cloud integration");
self.robots = Arc::new(AsyncMutex::new(get_robots(&self.settings.clone()).await?));
// let robots_with_states = update_robot_states(get_robots(neato_settings).await?).await?;

Expand All @@ -132,22 +132,43 @@ impl Neato {
debug!("Robot info: {:?}", robot);
}

// let neato = self.clone();
// self.clone().update_states().await?;
// tokio::spawn(async { poll_robots_until_all_idle(neato).await });
// Start the state polling loop
match self.init_polling().await {
Ok(_) => (),
Err(err) => {
error!("Error initializing polling: {}", err);
}
}

info!("Neato initialized");
match self.init_react_to_subscription_messages().await {
Ok(_) => (),
Err(err) => {
error!("Error initializing subscription messages: {}", err);
}
};

info!("Neato connection initialized");

Ok(self)
}

pub async fn update_states(&self) -> color_eyre::Result<()> {
async fn force_update_states(&self) -> color_eyre::Result<()> {
for robot in self.robots.lock().await.iter_mut() {
debug!("Robot info before update: {:?}", robot);

let result = send_command(robot, &RobotCmd::GetRobotState).await?;
let serialized_result: NeatoState = serde_json::from_str(&result).unwrap();
robot.state = Some(serialized_result);
// robot.state = robot_map.get(&robot.serial).unwrap().state.clone();
debug!("Robot info after update: {:?}\n", robot);
}

*self.last_state_update.lock().unwrap() = Some(Utc::now());

Ok(())
}

async fn update_states(&self) -> color_eyre::Result<()> {
// don't update states if we've done it recently
// let last_update = self.last_state_update.lock().unwrap().clone();
match self.last_state_update.try_lock() {
Expand All @@ -163,7 +184,7 @@ impl Neato {
);
return Ok(());
} else {
info!("Updating robot states");
debug!("Updating robot states");
}
}
Err(_) => {
Expand All @@ -173,22 +194,12 @@ impl Neato {
}

// Now we lock the robots again and update the state of each robot
for robot in self.robots.lock().await.iter_mut() {
debug!("Robot info before update: {:?}", robot);

let result = send_command(robot, &RobotCmd::GetRobotState).await?;
let serialized_result: NeatoState = serde_json::from_str(&result).unwrap();
robot.state = Some(serialized_result);
// robot.state = robot_map.get(&robot.serial).unwrap().state.clone();
debug!("Robot info after update: {:?}\n", robot);
}

*self.last_state_update.lock().unwrap() = Some(Utc::now());
self.force_update_states().await?;

Ok(())
}

pub async fn init_polling(&self) -> color_eyre::Result<()> {
async fn init_polling(&self) -> color_eyre::Result<()> {
let poll_rate = Duration::from_millis(self.settings.poll_interval as u64 * 1000);
let neato = self.clone();
let mqtt_client = self.mqtt_client.clone();
Expand All @@ -212,6 +223,49 @@ impl Neato {
});
Ok(())
}

async fn init_react_to_subscription_messages(&self) -> color_eyre::Result<()> {

let mut s = self.clone();

tokio::spawn(async move {
loop {
s.mqtt_client
.rx
.changed()
.await
.expect("Expected rx channel never to close");
let msg = s.mqtt_client.rx.borrow().clone();

println!("Received update instruction! Device: {:?}", msg);

match msg {
Some(SendAction { action: RobotCmd::GetRobotState, .. }) => {
debug!("We don't do state updates from set messages");
}
Some(SendAction { action, id }) => {
let robot = s.robots
.lock().await
.clone()
.into_iter()
.find(|r| r.name == id)
.unwrap();
// let r = robot.clone().unwrap();
// info!("{}: Starting cleaning", id);
info!("{}: {}", robot.name, action);
if s.settings.dry_run {
info!("Dry run enabled, not sending command");
} else {
send_command(&robot, &action).await.unwrap();
}
}
_ => {}
};
}
});

Ok(())
}
}

async fn get_robots(config: &NeatoSettings) -> Result<Vec<Robot>> {
Expand Down Expand Up @@ -247,8 +301,8 @@ async fn send_command(robot: &Robot, cmd: &RobotCmd) -> Result<String> {
let robot_message = cmd.build_robot_message();

debug!(
"Sending command: {:?} to robot {}",
robot_message, robot.name
"Robot name {}, Sending command: {:?}",
robot.name, robot_message
);

let body = serde_json::to_string(&robot_message)?;
Expand Down
4 changes: 3 additions & 1 deletion src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct NeatoSettings {
pub poll_interval: u16, // seconds
pub cache_timeout: u16, // seconds
pub decode_state: bool,
pub dry_run: bool,
}

fn default_poll_interval() -> u16 {
Expand Down Expand Up @@ -53,7 +54,8 @@ pub fn read_settings() -> Result<Settings, config::ConfigError> {
.set_default("neato.poll_interval", default_poll_interval())?
.set_default("neato.cache_timeout", default_cache_timeout())?
.set_default("neato.decode_state", false)?
.set_override_option("MQTT_HOST", env::var("MQTT_HOST").ok())?
.set_default("neato.dry_run", false)?
.set_override_option("mqtt.host", env::var("MQTT_HOST").ok())?
.build()?
.try_deserialize::<Settings>()
}

0 comments on commit 60eaef2

Please sign in to comment.