Skip to content

Commit

Permalink
pub: respond default topic
Browse files Browse the repository at this point in the history
  • Loading branch information
jordens committed Jan 22, 2025
1 parent 93208f3 commit 17fdbac
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This document describes the changes to Minimq between releases.
## Changed
* The `Publication::finish()` API was removed in favor of a new `Publication::respond()` API for
constructing replies to previously received messages.
* `DeferredPublication` has been renamed to `Deferred` to clarify and fix its usage/content.
* [breaking] `embedded-nal` bumped. Now `core::net::SocketAddr` and related ip types are used.
MSRV becomes 1.77.0.

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ mod will;
pub use broker::Broker;
pub use config::ConfigBuilder;
pub use properties::Property;
pub use publication::{DeferredPublication, Publication};
pub use publication::{Deferred, Publication};
pub use reason_codes::ReasonCode;
pub use will::Will;

Expand Down
30 changes: 13 additions & 17 deletions src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub trait ToPayload {
fn serialize(self, buffer: &mut [u8]) -> Result<usize, Self::Error>;
}

impl<'a> ToPayload for &'a [u8] {
impl ToPayload for &[u8] {
type Error = ();

fn serialize(self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
Expand All @@ -21,7 +21,7 @@ impl<'a> ToPayload for &'a [u8] {
}
}

impl<'a> ToPayload for &'a str {
impl ToPayload for &str {
type Error = ();

fn serialize(self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
Expand All @@ -37,30 +37,24 @@ impl<const N: usize> ToPayload for &[u8; N] {
}
}

/// A publication where the payload is serialized directly into the transmission buffer in the
/// A payload that is serialized directly into the transmission buffer in the
/// future.
///
/// # Note
/// This is "deferred" because the closure will only be called once the publication is actually
/// sent.
pub struct DeferredPublication<F> {
pub struct Deferred<F> {
func: F,
}

impl<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> DeferredPublication<F> {
pub fn new<'a>(topic: &'a str, func: F) -> Publication<'a, Self> {
Publication::new(topic, Self { func })
}

pub fn respond<'a>(
received_properties: &'a Properties<'a>,
func: F,
) -> Result<Publication<'a, Self>, ProtocolError> {
Publication::respond(received_properties, Self { func })
impl<F> Deferred<F> {
/// Create a new deferred payload.
pub fn new(func: F) -> Self {
Self { func }
}
}

impl<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> ToPayload for DeferredPublication<F> {
impl<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> ToPayload for Deferred<F> {
type Error = E;
fn serialize(self, buffer: &mut [u8]) -> Result<usize, E> {
(self.func)(buffer)
Expand All @@ -87,7 +81,7 @@ pub struct Publication<'a, P> {
pub(crate) retain: Retain,
}

impl<'a, P: ToPayload> Publication<'a, P> {
impl<'a, P> Publication<'a, P> {
/// Generate the publication as a reply to some other received message.
///
/// # Note
Expand All @@ -98,8 +92,9 @@ impl<'a, P: ToPayload> Publication<'a, P> {
/// publication properties.
///
/// * If a response topic is identified, the message topic will be
/// configured for it, which will override any previously-specified topic.
/// configured for it, which will override the default topic.
pub fn respond(
default_topic: Option<&'a str>,
received_properties: &'a Properties<'a>,
payload: P,
) -> Result<Self, ProtocolError> {
Expand All @@ -113,6 +108,7 @@ impl<'a, P: ToPayload> Publication<'a, P> {
None
}
})
.or(default_topic)
.ok_or(ProtocolError::NoTopic)?;

let publication = Self::new(response_topic, payload);
Expand Down
5 changes: 3 additions & 2 deletions tests/deferred_payload.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use minimq::{DeferredPublication, Minimq, QoS};
use minimq::{Deferred, Minimq, Publication, QoS};

use core::net::{IpAddr, Ipv4Addr};
use std_embedded_time::StandardClock;
Expand All @@ -23,7 +23,8 @@ fn main() -> std::io::Result<()> {

assert!(matches!(
mqtt.client().publish(
DeferredPublication::new("data", |_buf| { Err("Oops!") }).qos(QoS::ExactlyOnce)
Publication::new("data", Deferred::new(|_buf: &mut [u8]| Err("Oops!")))
.qos(QoS::ExactlyOnce)
),
Err(minimq::PubError::Serialization("Oops!"))
));
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn main() -> std::io::Result<()> {
.poll(|client, topic, payload, properties| {
log::info!("{} < {}", topic, core::str::from_utf8(payload).unwrap());

if let Ok(response) = Publication::respond(properties, b"Pong") {
if let Ok(response) = Publication::respond(None, properties, b"Pong") {
client.publish(response).unwrap();
}

Expand Down

0 comments on commit 17fdbac

Please sign in to comment.