Skip to content

Commit

Permalink
Merge pull request #176 from quartiq/feature/client-consolidation
Browse files Browse the repository at this point in the history
Combining telemetry and control MQTT clients
  • Loading branch information
ryan-summers authored Jan 28, 2022
2 parents 4818443 + 8612aa3 commit 2501f4d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 90 deletions.
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ const APP: () = {
if let Ok(ref measurements) = measurements {
c.resources
.net_devices
.telemetry
.control
.report_telemetry(channel, measurements);
}
}
Expand Down Expand Up @@ -365,7 +365,7 @@ const APP: () = {
let main_bus = &mut c.resources.main_bus;
c.resources
.net_devices
.lock(|net| net.controller.update(main_bus));
.lock(|net| net.control.update(main_bus));

// Handle the network stack processing if needed.
c.resources.net_devices.lock(|net| net.process());
Expand Down
15 changes: 7 additions & 8 deletions src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use crate::delay::AsmDelay;

mod mqtt_control;
mod shared;
mod telemetry;

use mqtt_control::ControlState;
use shared::NetworkManager;

type NetworkStackProxy = shared::NetworkStackProxy<'static, NetworkStack>;
Expand All @@ -26,8 +24,7 @@ type NetworkStackProxy = shared::NetworkStackProxy<'static, NetworkStack>;
/// All devices accessing the shared stack must be contained within a single structure to prevent
/// potential pre-emption when using the `shared` network stack.
pub struct NetworkDevices {
pub controller: mqtt_control::ControlState,
pub telemetry: telemetry::TelemetryClient,
pub control: mqtt_control::ControlClient,
pub settings: miniconf::MqttClient<crate::Settings, NetworkStackProxy, SystemTimer, 256>,

// The stack reference is only used if the ENC424J600 PHY is used.
Expand Down Expand Up @@ -61,8 +58,12 @@ impl NetworkDevices {
write!(&mut miniconf_prefix, "dt/sinara/booster/{}", identifier).unwrap();

Self {
telemetry: telemetry::TelemetryClient::new(broker, shared.acquire_stack(), identifier),
controller: ControlState::new(broker, shared.acquire_stack(), identifier, delay),
control: mqtt_control::ControlClient::new(
broker,
shared.acquire_stack(),
identifier,
delay,
),
settings: miniconf::MqttClient::new(
shared.acquire_stack(),
&miniconf_client,
Expand All @@ -82,8 +83,6 @@ impl NetworkDevices {
/// This function must be called periodically to handle ingress/egress of packets and update
/// state management.
pub fn process(&mut self) -> bool {
self.telemetry.process();

#[cfg(feature = "phy_enc424j600")]
return self
.stack
Expand Down
38 changes: 34 additions & 4 deletions src/net/mqtt_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use core::fmt::Write;
use embedded_hal::blocking::delay::DelayUs;
use heapless::String;
use minimq::{Property, QoS};
use serde::Serialize;

/// Specifies an action to take on a channel.
#[derive(serde::Deserialize, Debug)]
Expand Down Expand Up @@ -105,16 +106,17 @@ impl Response {
}

/// Represents a means of handling MQTT-based control interface.
pub struct ControlState {
mqtt: minimq::Minimq<NetworkStackProxy, SystemTimer, 256, 1>,
pub struct ControlClient {
mqtt: minimq::Minimq<NetworkStackProxy, SystemTimer, 512, 1>,
subscribed: bool,
control_topic: String<64>,
telemetry_prefix: String<128>,
default_response_topic: String<64>,
delay: AsmDelay,
}

impl ControlState {
/// Construct the MQTT control state manager.
impl ControlClient {
/// Construct the MQTT control manager.
pub fn new<'a>(
broker: minimq::embedded_nal::IpAddr,
stack: super::NetworkStackProxy,
Expand All @@ -127,18 +129,46 @@ impl ControlState {
let mut control_topic: String<64> = String::new();
write!(&mut control_topic, "dt/sinara/booster/{}/control", id).unwrap();

let mut telemetry_prefix: String<128> = String::new();
write!(&mut telemetry_prefix, "dt/sinara/booster/{}/telemetry", id).unwrap();

let mut default_response_topic: String<64> = String::new();
write!(&mut default_response_topic, "dt/sinara/booster/{}/log", id).unwrap();

Self {
mqtt: minimq::Minimq::new(broker, &client_id, stack, SystemTimer::default()).unwrap(),
subscribed: false,
control_topic,
telemetry_prefix,
default_response_topic,
delay,
}
}

/// Publish telemetry for a specific channel.
///
/// # Args
/// * `channel` - The channel that telemetry is being reported for.
/// * `telemetry` - The associated telemetry of the channel to report.
pub fn report_telemetry(&mut self, channel: Channel, telemetry: &impl Serialize) {
let mut topic: String<64> = String::new();
write!(&mut topic, "{}/ch{}", self.telemetry_prefix, channel as u8).unwrap();

let message: String<1024> = serde_json_core::to_string(telemetry).unwrap();

// All telemtry is published in a best-effort manner.
self.mqtt
.client
.publish(
topic.as_str(),
&message.into_bytes(),
minimq::QoS::AtMostOnce,
minimq::Retain::NotRetained,
&[],
)
.unwrap();
}

/// Handle the MQTT-based control interface.
///
/// # Args
Expand Down
76 changes: 0 additions & 76 deletions src/net/telemetry.rs

This file was deleted.

0 comments on commit 2501f4d

Please sign in to comment.