Skip to content

Ensure that a waitable is only added to one wait set at a time #224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rclrs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum RclrsError {
/// The error indicating the position of the nul byte.
err: NulError,
},
/// It was attempted to add a waitable to a wait set twice.
AlreadyAddedToWaitSet,
}

impl Display for RclrsError {
Expand All @@ -37,6 +39,12 @@ impl Display for RclrsError {
RclrsError::StringContainsNul { s, .. } => {
write!(f, "Could not convert string '{}' to CString", s)
}
RclrsError::AlreadyAddedToWaitSet => {
write!(
f,
"Could not add entity to wait set because it was already added to a wait set"
)
}
}
}
}
Expand Down Expand Up @@ -68,6 +76,7 @@ impl Error for RclrsError {
RclrsError::RclError { msg, .. } => msg.as_ref().map(|e| e as &dyn Error),
RclrsError::UnknownRclError { msg, .. } => msg.as_ref().map(|e| e as &dyn Error),
RclrsError::StringContainsNul { err, .. } => Some(err).map(|e| e as &dyn Error),
RclrsError::AlreadyAddedToWaitSet => None,
}
}
}
Expand Down
13 changes: 3 additions & 10 deletions rclrs/src/node/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::node::client::oneshot::Canceled;
use futures::channel::oneshot;
use std::boxed::Box;
use std::collections::HashMap;
use std::ffi::CString;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use crate::error::{RclReturnCode, ToResult};
Expand All @@ -21,6 +21,7 @@ unsafe impl Send for rcl_client_t {}
pub struct ClientHandle {
rcl_client_mtx: Mutex<rcl_client_t>,
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
}

impl ClientHandle {
Expand All @@ -40,15 +41,6 @@ impl Drop for ClientHandle {
}
}

impl From<Canceled> for RclrsError {
fn from(_: Canceled) -> Self {
RclrsError::RclError {
code: RclReturnCode::Error,
msg: None,
}
}
}

/// Trait to be implemented by concrete Client structs.
///
/// See [`Client<T>`] for an example.
Expand Down Expand Up @@ -113,6 +105,7 @@ where
let handle = Arc::new(ClientHandle {
rcl_client_mtx: Mutex::new(rcl_client),
rcl_node_mtx: node.rcl_node_mtx.clone(),
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Ok(Self {
Expand Down
3 changes: 3 additions & 0 deletions rclrs/src/node/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::boxed::Box;
use std::ffi::CString;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use crate::error::{RclReturnCode, ToResult};
Expand All @@ -20,6 +21,7 @@ unsafe impl Send for rcl_service_t {}
pub struct ServiceHandle {
handle: Mutex<rcl_service_t>,
node_handle: Arc<Mutex<rcl_node_t>>,
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
}

impl ServiceHandle {
Expand Down Expand Up @@ -103,6 +105,7 @@ where
let handle = Arc::new(ServiceHandle {
handle: Mutex::new(service_handle),
node_handle: node.rcl_node_mtx.clone(),
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Ok(Self {
Expand Down
3 changes: 3 additions & 0 deletions rclrs/src/node/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::boxed::Box;
use std::ffi::CStr;
use std::ffi::CString;
use std::marker::PhantomData;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use rosidl_runtime_rs::{Message, RmwMessage};
Expand All @@ -21,6 +22,7 @@ unsafe impl Send for rcl_subscription_t {}
pub struct SubscriptionHandle {
rcl_subscription_mtx: Mutex<rcl_subscription_t>,
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
}

impl SubscriptionHandle {
Expand Down Expand Up @@ -116,6 +118,7 @@ where
let handle = Arc::new(SubscriptionHandle {
rcl_subscription_mtx: Mutex::new(rcl_subscription),
rcl_node_mtx: node.rcl_node_mtx.clone(),
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Ok(Self {
Expand Down
113 changes: 86 additions & 27 deletions rclrs/src/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use std::vec::Vec;

use parking_lot::Mutex;

mod exclusivity_guard;
use exclusivity_guard::*;

/// A struct for waiting on subscriptions and other waitable entities to become ready.
pub struct WaitSet {
rcl_wait_set: rcl_wait_set_t,
Expand All @@ -33,9 +36,9 @@ pub struct WaitSet {
// The subscriptions that are currently registered in the wait set.
// This correspondence is an invariant that must be maintained by all functions,
// even in the error case.
subscriptions: Vec<Arc<dyn SubscriptionBase>>,
clients: Vec<Arc<dyn ClientBase>>,
services: Vec<Arc<dyn ServiceBase>>,
subscriptions: Vec<ExclusivityGuard<Arc<dyn SubscriptionBase>>>,
clients: Vec<ExclusivityGuard<Arc<dyn ClientBase>>>,
services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
}

/// A list of entities that are ready, returned by [`WaitSet::wait`].
Expand Down Expand Up @@ -118,17 +121,22 @@ impl WaitSet {

/// Adds a subscription to the wait set.
///
/// It is possible, but not useful, to add the same subscription twice.
///
/// This will return an error if the number of subscriptions in the wait set is larger than the
/// capacity set in [`WaitSet::new`].
/// # Errors
/// - If the subscription was already added to this wait set or another one,
/// [`AlreadyAddedToWaitSet`][1] will be returned
/// - If the number of subscriptions in the wait set is larger than the
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
///
/// The same subscription must not be added to multiple wait sets, because that would make it
/// unsafe to simultaneously wait on those wait sets.
/// [1]: crate::RclrsError
/// [2]: crate::RclReturnCode
pub fn add_subscription(
&mut self,
subscription: Arc<dyn SubscriptionBase>,
) -> Result<(), RclrsError> {
let exclusive_subscription = ExclusivityGuard::new(
Arc::clone(&subscription),
Arc::clone(&subscription.handle().in_use_by_wait_set),
)?;
unsafe {
// SAFETY: I'm not sure if it's required, but the subscription pointer will remain valid
// for as long as the wait set exists, because it's stored in self.subscriptions.
Expand All @@ -140,20 +148,25 @@ impl WaitSet {
)
}
.ok()?;
self.subscriptions.push(subscription);
self.subscriptions.push(exclusive_subscription);
Ok(())
}

/// Adds a client to the wait set.
///
/// It is possible, but not useful, to add the same client twice.
///
/// This will return an error if the number of clients in the wait set is larger than the
/// capacity set in [`WaitSet::new`].
/// # Errors
/// - If the client was already added to this wait set or another one,
/// [`AlreadyAddedToWaitSet`][1] will be returned
/// - If the number of clients in the wait set is larger than the
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
///
/// The same client must not be added to multiple wait sets, because that would make it
/// unsafe to simultaneously wait on those wait sets.
/// [1]: crate::RclrsError
/// [2]: crate::RclReturnCode
pub fn add_client(&mut self, client: Arc<dyn ClientBase>) -> Result<(), RclrsError> {
let exclusive_client = ExclusivityGuard::new(
Arc::clone(&client),
Arc::clone(&client.handle().in_use_by_wait_set),
)?;
unsafe {
// SAFETY: I'm not sure if it's required, but the client pointer will remain valid
// for as long as the wait set exists, because it's stored in self.clients.
Expand All @@ -165,20 +178,25 @@ impl WaitSet {
)
}
.ok()?;
self.clients.push(client);
self.clients.push(exclusive_client);
Ok(())
}

/// Adds a service to the wait set.
///
/// It is possible, but not useful, to add the same service twice.
///
/// This will return an error if the number of services in the wait set is larger than the
/// capacity set in [`WaitSet::new`].
/// # Errors
/// - If the service was already added to this wait set or another one,
/// [`AlreadyAddedToWaitSet`][1] will be returned
/// - If the number of services in the wait set is larger than the
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
///
/// The same service must not be added to multiple wait sets, because that would make it
/// unsafe to simultaneously wait on those wait sets.
/// [1]: crate::RclrsError
/// [2]: crate::RclReturnCode
pub fn add_service(&mut self, service: Arc<dyn ServiceBase>) -> Result<(), RclrsError> {
let exclusive_service = ExclusivityGuard::new(
Arc::clone(&service),
Arc::clone(&service.handle().in_use_by_wait_set),
)?;
unsafe {
// SAFETY: I'm not sure if it's required, but the service pointer will remain valid
// for as long as the wait set exists, because it's stored in self.services.
Expand All @@ -190,7 +208,7 @@ impl WaitSet {
)
}
.ok()?;
self.services.push(service);
self.services.push(exclusive_service);
Ok(())
}

Expand Down Expand Up @@ -245,7 +263,9 @@ impl WaitSet {
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
let wait_set_entry = unsafe { *self.rcl_wait_set.subscriptions.add(i) };
if !wait_set_entry.is_null() {
ready_entities.subscriptions.push(subscription.clone());
ready_entities
.subscriptions
.push(Arc::clone(&subscription.waitable));
}
}
for (i, client) in self.clients.iter().enumerate() {
Expand All @@ -254,7 +274,7 @@ impl WaitSet {
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
let wait_set_entry = unsafe { *self.rcl_wait_set.clients.add(i) };
if !wait_set_entry.is_null() {
ready_entities.clients.push(client.clone());
ready_entities.clients.push(Arc::clone(&client.waitable));
}
}
for (i, service) in self.services.iter().enumerate() {
Expand All @@ -263,9 +283,48 @@ impl WaitSet {
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
let wait_set_entry = unsafe { *self.rcl_wait_set.services.add(i) };
if !wait_set_entry.is_null() {
ready_entities.services.push(service.clone());
ready_entities.services.push(Arc::clone(&service.waitable));
}
}
Ok(ready_entities)
}
}

#[cfg(test)]
mod tests {
use crate::{Context, Node, RclrsError, WaitSet, QOS_PROFILE_DEFAULT};
use std::sync::Arc;

#[test]
fn test_adding_waitable_to_wait_sets() -> Result<(), RclrsError> {
let context = Context::new([])?;
let mut node = Node::new(&context, "test_adding_waitable_to_wait_sets")?;
let subscription = node.create_subscription(
"test",
QOS_PROFILE_DEFAULT,
move |_: std_msgs::msg::String| {},
)?;
let mut wait_set_1 = WaitSet::new(1, 0, 0, 0, 0, 0, &context)?;
let mut wait_set_2 = WaitSet::new(1, 0, 0, 0, 0, 0, &context)?;

// Try to add the subscription to wait set 1 twice
wait_set_1.add_subscription(Arc::clone(&subscription) as _)?;
assert!(wait_set_1
.add_subscription(Arc::clone(&subscription) as _)
.is_err());

// Try to add it to another wait set
assert!(wait_set_2
.add_subscription(Arc::clone(&subscription) as _)
.is_err());

// It works as soon as it is not anymore part of wait_set_1
wait_set_1.clear();
wait_set_2.add_subscription(Arc::clone(&subscription) as _)?;

// Dropping the wait set also frees up the subscription
drop(wait_set_2);
wait_set_1.add_subscription(Arc::clone(&subscription) as _)?;
Ok(())
}
}
53 changes: 53 additions & 0 deletions rclrs/src/wait/exclusivity_guard.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use crate::RclrsError;

/// A helper struct for tracking whether the waitable is currently in a wait set.
///
/// When this struct is constructed, which happens when adding an entity to the wait set,
/// it checks that the atomic boolean is false and sets it to true.
/// When it is dropped, which happens when it is removed from the wait set,
/// or the wait set itself is dropped, it sets the atomic bool to false.
pub(super) struct ExclusivityGuard<T> {
in_use_by_wait_set: Arc<AtomicBool>,
pub(super) waitable: T,
}

impl<T> Drop for ExclusivityGuard<T> {
fn drop(&mut self) {
self.in_use_by_wait_set.store(false, Ordering::Relaxed)
}
}

impl<T> ExclusivityGuard<T> {
pub fn new(waitable: T, in_use_by_wait_set: Arc<AtomicBool>) -> Result<Self, RclrsError> {
if in_use_by_wait_set
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
return Err(RclrsError::AlreadyAddedToWaitSet);
}
Ok(Self {
in_use_by_wait_set,
waitable,
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

#[test]
fn test_exclusivity_guard() {
let atomic = Arc::new(AtomicBool::new(false));
let eg = ExclusivityGuard::new((), Arc::clone(&atomic)).unwrap();
assert!(ExclusivityGuard::new((), Arc::clone(&atomic)).is_err());
drop(eg);
assert!(!atomic.load(Ordering::Relaxed));
assert!(ExclusivityGuard::new((), Arc::clone(&atomic)).is_ok());
}
}