Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions rclrs/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl ExecutorCommands {
}

pub(crate) fn add_to_wait_set(&self, waitable: Waitable) {
self.async_worker_commands.channel.add_to_waitset(waitable);
self.async_worker_commands.add_to_wait_set(waitable);
}

#[cfg(test)]
Expand Down Expand Up @@ -275,7 +275,7 @@ impl ExecutorCommands {
guard_condition: Arc::clone(&guard_condition),
});

worker_channel.add_to_waitset(waitable);
worker_channel.add_to_wait_set(waitable);

Arc::new(WorkerCommands {
channel: worker_channel,
Expand All @@ -296,7 +296,8 @@ pub(crate) struct WorkerCommands {

impl WorkerCommands {
pub(crate) fn add_to_wait_set(&self, waitable: Waitable) {
self.channel.add_to_waitset(waitable);
self.channel.add_to_wait_set(waitable);
let _ = self.wakeup_wait_set.trigger();
}

pub(crate) fn run_async<F>(&self, f: F)
Expand Down Expand Up @@ -327,7 +328,7 @@ pub trait WorkerChannel: Send + Sync {
fn add_async_task(&self, f: BoxFuture<'static, ()>);

/// Add new entities to the waitset of the executor.
fn add_to_waitset(&self, new_entity: Waitable);
fn add_to_wait_set(&self, new_entity: Waitable);

/// Send a one-time task for the worker to run with its payload.
fn send_payload_task(&self, f: PayloadTask);
Expand Down
2 changes: 1 addition & 1 deletion rclrs/src/executor/basic_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ struct BasicWorkerChannel {
}

impl WorkerChannel for BasicWorkerChannel {
fn add_to_waitset(&self, new_entity: Waitable) {
fn add_to_wait_set(&self, new_entity: Waitable) {
if let Err(err) = self.waitable_sender.unbounded_send(new_entity) {
// This is a debug log because it is normal for this to happen while
// an executor is winding down.
Expand Down
4 changes: 2 additions & 2 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,13 @@ impl NodeState {
/// ```
///
pub fn create_publisher<'a, T>(
&self,
self: &Arc<Self>,
options: impl Into<PublisherOptions<'a>>,
) -> Result<Publisher<T>, RclrsError>
where
T: Message,
{
PublisherState::<T>::create(options, Arc::clone(&self.handle))
PublisherState::<T>::create(options, Arc::clone(self))
}

/// Creates a [`Service`] with an ordinary callback.
Expand Down
25 changes: 19 additions & 6 deletions rclrs/src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
error::{RclrsError, ToResult},
qos::QoSProfile,
rcl_bindings::*,
IntoPrimitiveOptions, NodeHandle, ENTITY_LIFECYCLE_MUTEX,
IntoPrimitiveOptions, Node, Promise, ENTITY_LIFECYCLE_MUTEX,
};

mod loaned_message;
Expand All @@ -28,12 +28,14 @@ unsafe impl Send for rcl_publisher_t {}
/// [1]: <https://doc.rust-lang.org/reference/destructors.html>
struct PublisherHandle {
rcl_publisher: Mutex<rcl_publisher_t>,
node_handle: Arc<NodeHandle>,
/// We store the whole node here because we use some of its user-facing API
/// in some of the Publisher methods.
node: Node,
}

impl Drop for PublisherHandle {
fn drop(&mut self) {
let mut rcl_node = self.node_handle.rcl_node.lock().unwrap();
let mut rcl_node = self.node.handle().rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
// SAFETY: The entity lifecycle mutex is locked to protect against the risk of
// global variables in the rmw implementation being unsafely modified during cleanup.
Expand Down Expand Up @@ -97,7 +99,7 @@ where
/// Node and namespace changes are always applied _before_ topic remapping.
pub(crate) fn create<'a>(
options: impl Into<PublisherOptions<'a>>,
node_handle: Arc<NodeHandle>,
node: Node,
) -> Result<Arc<Self>, RclrsError>
where
T: Message,
Expand All @@ -117,7 +119,7 @@ where
publisher_options.qos = qos.into();

{
let rcl_node = node_handle.rcl_node.lock().unwrap();
let rcl_node = node.handle().rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
unsafe {
// SAFETY:
Expand All @@ -142,7 +144,7 @@ where
message: PhantomData,
handle: PublisherHandle {
rcl_publisher: Mutex::new(rcl_publisher),
node_handle,
node,
},
}))
}
Expand Down Expand Up @@ -177,6 +179,17 @@ where
Ok(subscription_count)
}

/// Get a promise that will be fulfilled when at least one subscriber is
/// listening to this publisher.
pub fn notify_on_subscriber_ready(self: &Arc<PublisherState<T>>) -> Promise<()> {
let publisher = Arc::clone(self);
self.handle.node.notify_on_graph_change(move || {
publisher
.get_subscription_count()
.is_ok_and(|count| count > 0)
})
}

/// Publishes a message.
///
/// The [`MessageCow`] trait is implemented by any
Expand Down
69 changes: 69 additions & 0 deletions rclrs/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,4 +520,73 @@ mod tests {
assert!(start_time.elapsed() < std::time::Duration::from_secs(10));
}
}

#[test]
fn test_delayed_subscription() {
use crate::*;
use example_interfaces::msg::Empty;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use std::sync::atomic::{AtomicBool, Ordering};

let mut executor = Context::default().create_basic_executor();
let node = executor
.create_node(
format!("test_delayed_subscription_{}", line!())
// We need to turn off parameter services because their activity will
// wake up the wait set, which defeats the purpose of this test.
.start_parameter_services(false),
)
.unwrap();

let (promise, receiver) = oneshot::channel();
let promise = Arc::new(Mutex::new(Some(promise)));

let success = Arc::new(AtomicBool::new(false));
let send_success = Arc::clone(&success);

let publisher = node.create_publisher("test_delayed_subscription").unwrap();

let commands = Arc::clone(executor.commands());
std::thread::spawn(move || {
// Wait a little while so the executor can start spinning and guard
// conditions can settle down.
std::thread::sleep(std::time::Duration::from_millis(10));

let _ = commands.run(async move {
let (sender, mut receiver) = mpsc::unbounded();
let _subscription = node
.create_subscription("test_delayed_subscription", move |_: Empty| {
let _ = sender.unbounded_send(());
})
.unwrap();

// Make sure the message doesn't get dropped due to the subscriber
// not being connected yet.
let _ = publisher.notify_on_subscriber_ready().await;

// Publish the message, which should trigger the executor to stop spinning
publisher.publish(Empty::default()).unwrap();

if let Some(_) = receiver.next().await {
send_success.store(true, Ordering::Release);
if let Some(promise) = promise.lock().unwrap().take() {
promise.send(()).unwrap();
}
}
});
});

let r = executor.spin(
SpinOptions::default()
.until_promise_resolved(receiver)
.timeout(std::time::Duration::from_secs(10)),
);

assert!(r.is_empty(), "{r:?}");
let message_was_received = success.load(Ordering::Acquire);
assert!(message_was_received);
}
}
Loading