Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combining telemetry and control MQTT clients #176

Merged
merged 1 commit into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ const APP: () = {
if let Ok(ref measurements) = measurements {
c.resources
.net_devices
.telemetry
.control
.report_telemetry(channel, measurements);
}
}
Expand Down Expand Up @@ -365,7 +365,7 @@ const APP: () = {
let main_bus = &mut c.resources.main_bus;
c.resources
.net_devices
.lock(|net| net.controller.update(main_bus));
.lock(|net| net.control.update(main_bus));

// Handle the network stack processing if needed.
c.resources.net_devices.lock(|net| net.process());
Expand Down
15 changes: 7 additions & 8 deletions src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use crate::delay::AsmDelay;

mod mqtt_control;
mod shared;
mod telemetry;

use mqtt_control::ControlState;
use shared::NetworkManager;

type NetworkStackProxy = shared::NetworkStackProxy<'static, NetworkStack>;
Expand All @@ -26,8 +24,7 @@ type NetworkStackProxy = shared::NetworkStackProxy<'static, NetworkStack>;
/// All devices accessing the shared stack must be contained within a single structure to prevent
/// potential pre-emption when using the `shared` network stack.
pub struct NetworkDevices {
pub controller: mqtt_control::ControlState,
pub telemetry: telemetry::TelemetryClient,
pub control: mqtt_control::ControlClient,
pub settings: miniconf::MqttClient<crate::Settings, NetworkStackProxy, SystemTimer, 256>,

// The stack reference is only used if the ENC424J600 PHY is used.
Expand Down Expand Up @@ -61,8 +58,12 @@ impl NetworkDevices {
write!(&mut miniconf_prefix, "dt/sinara/booster/{}", identifier).unwrap();

Self {
telemetry: telemetry::TelemetryClient::new(broker, shared.acquire_stack(), identifier),
controller: ControlState::new(broker, shared.acquire_stack(), identifier, delay),
control: mqtt_control::ControlClient::new(
broker,
shared.acquire_stack(),
identifier,
delay,
),
settings: miniconf::MqttClient::new(
shared.acquire_stack(),
&miniconf_client,
Expand All @@ -82,8 +83,6 @@ impl NetworkDevices {
/// This function must be called periodically to handle ingress/egress of packets and update
/// state management.
pub fn process(&mut self) -> bool {
self.telemetry.process();

#[cfg(feature = "phy_enc424j600")]
return self
.stack
Expand Down
38 changes: 34 additions & 4 deletions src/net/mqtt_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use core::fmt::Write;
use embedded_hal::blocking::delay::DelayUs;
use heapless::String;
use minimq::{Property, QoS};
use serde::Serialize;

/// Specifies an action to take on a channel.
#[derive(serde::Deserialize, Debug)]
Expand Down Expand Up @@ -105,16 +106,17 @@ impl Response {
}

/// Represents a means of handling MQTT-based control interface.
pub struct ControlState {
mqtt: minimq::Minimq<NetworkStackProxy, SystemTimer, 256, 1>,
pub struct ControlClient {
mqtt: minimq::Minimq<NetworkStackProxy, SystemTimer, 512, 1>,
subscribed: bool,
control_topic: String<64>,
telemetry_prefix: String<128>,
default_response_topic: String<64>,
delay: AsmDelay,
}

impl ControlState {
/// Construct the MQTT control state manager.
impl ControlClient {
/// Construct the MQTT control manager.
pub fn new<'a>(
broker: minimq::embedded_nal::IpAddr,
stack: super::NetworkStackProxy,
Expand All @@ -127,18 +129,46 @@ impl ControlState {
let mut control_topic: String<64> = String::new();
write!(&mut control_topic, "dt/sinara/booster/{}/control", id).unwrap();

let mut telemetry_prefix: String<128> = String::new();
write!(&mut telemetry_prefix, "dt/sinara/booster/{}/telemetry", id).unwrap();

let mut default_response_topic: String<64> = String::new();
write!(&mut default_response_topic, "dt/sinara/booster/{}/log", id).unwrap();

Self {
mqtt: minimq::Minimq::new(broker, &client_id, stack, SystemTimer::default()).unwrap(),
subscribed: false,
control_topic,
telemetry_prefix,
default_response_topic,
delay,
}
}

/// Publish telemetry for a specific channel.
///
/// # Args
/// * `channel` - The channel that telemetry is being reported for.
/// * `telemetry` - The associated telemetry of the channel to report.
pub fn report_telemetry(&mut self, channel: Channel, telemetry: &impl Serialize) {
let mut topic: String<64> = String::new();
write!(&mut topic, "{}/ch{}", self.telemetry_prefix, channel as u8).unwrap();

let message: String<1024> = serde_json_core::to_string(telemetry).unwrap();

// All telemtry is published in a best-effort manner.
self.mqtt
.client
.publish(
topic.as_str(),
&message.into_bytes(),
minimq::QoS::AtMostOnce,
minimq::Retain::NotRetained,
&[],
)
.unwrap();
}

/// Handle the MQTT-based control interface.
///
/// # Args
Expand Down
76 changes: 0 additions & 76 deletions src/net/telemetry.rs

This file was deleted.