Skip to content

Commit 6543fd4

Browse files
Adds Guard Conditions
- Adds the Guard Condition struct encapsulating rcl_guard_condition_t. - Adds optional callbacks and a trigger method to approximate the rclcpp implementation. - Adds an `add_guard_condition` method to WaitSet to add the GuardCondition to the WaitSet
1 parent 047b7fa commit 6543fd4

File tree

5 files changed

+250
-3
lines changed

5 files changed

+250
-3
lines changed

rclrs/CHANGELOG.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ Changelog for package rclrs
55
0.3
66
----------------
77
* Loaned messages (zero-copy) (`#212 <https://github.com/ros2-rust/ros2_rust/pull/212>`_)
8+
* Graph support (`#234 <https://github.com/ros2-rust/ros2_rust/pull/234>`_)
9+
* Guard conditions (`#249 <https://github.com/ros2-rust/ros2_rust/pull/249>`_)
810

911
0.2 (2022-07-21)
1012
----------------
1113
* First release
1214
* Build based on `colcon-ros-cargo`
1315
* Message generation packages `rosidl_generator_rs` and `rosidl_runtime_rs`
1416
* Publisher, Subscription, Client and Service
15-
* Tunable QoS settings
17+
* Tunable QoS settings

rclrs/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "rclrs"
3-
version = "0.2.0"
3+
version = "0.3.0"
44
# This project is not military-sponsored, Jacob's employment contract just requires him to use this email address
55
authors = ["Esteve Fernandez <esteve@apache.org>", "Nikolai Morin <nnmmgit@gmail.com>", "Jacob Hassold <jacob.a.hassold.civ@army.mil>"]
66
edition = "2021"

rclrs/package.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
schematypens="http://www.w3.org/2001/XMLSchema"?>
55
<package format="3">
66
<name>rclrs</name>
7-
<version>0.2.0</version>
7+
<version>0.3.0</version>
88
<description>Package containing the Rust client library.</description>
99
<maintainer email="esteve@apache.org">Esteve Fernandez</maintainer>
1010
<maintainer email="nnmmgit@gmail.com">Nikolai Morin</maintainer>

rclrs/src/wait.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use std::time::Duration;
2424
use std::vec::Vec;
2525

2626
mod exclusivity_guard;
27+
mod guard_condition;
2728
use exclusivity_guard::*;
29+
pub use guard_condition::*;
2830

2931
/// A struct for waiting on subscriptions and other waitable entities to become ready.
3032
pub struct WaitSet {
@@ -36,6 +38,8 @@ pub struct WaitSet {
3638
// even in the error case.
3739
subscriptions: Vec<ExclusivityGuard<Arc<dyn SubscriptionBase>>>,
3840
clients: Vec<ExclusivityGuard<Arc<dyn ClientBase>>>,
41+
// The guard conditions that are currently registered in the wait set.
42+
guard_conditions: Vec<ExclusivityGuard<Arc<GuardCondition>>>,
3943
services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
4044
}
4145

@@ -45,6 +49,8 @@ pub struct ReadyEntities {
4549
pub subscriptions: Vec<Arc<dyn SubscriptionBase>>,
4650
/// A list of clients that have potentially received responses.
4751
pub clients: Vec<Arc<dyn ClientBase>>,
52+
/// A list of guard conditions that have been triggered.
53+
pub guard_conditions: Vec<Arc<GuardCondition>>,
4854
/// A list of services that have potentially received requests.
4955
pub services: Vec<Arc<dyn ServiceBase>>,
5056
}
@@ -105,6 +111,7 @@ impl WaitSet {
105111
rcl_wait_set,
106112
_rcl_context_mtx: context.rcl_context_mtx.clone(),
107113
subscriptions: Vec::new(),
114+
guard_conditions: Vec::new(),
108115
clients: Vec::new(),
109116
services: Vec::new(),
110117
})
@@ -116,6 +123,7 @@ impl WaitSet {
116123
/// [`WaitSet::new`].
117124
pub fn clear(&mut self) {
118125
self.subscriptions.clear();
126+
self.guard_conditions.clear();
119127
self.clients.clear();
120128
self.services.clear();
121129
// This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
@@ -159,6 +167,38 @@ impl WaitSet {
159167
Ok(())
160168
}
161169

170+
/// Adds a guard condition to the wait set.
171+
///
172+
/// # Errors
173+
/// - If the guard condition was already added to this wait set or another one,
174+
/// [`AlreadyAddedToWaitSet`][1] will be returned
175+
/// - If the number of guard conditions in the wait set is larger than the
176+
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
177+
///
178+
/// [1]: crate::RclrsError
179+
/// [2]: crate::RclReturnCode
180+
pub fn add_guard_condition(
181+
&mut self,
182+
guard_condition: Arc<GuardCondition>,
183+
) -> Result<(), RclrsError> {
184+
let exclusive_guard_condition = ExclusivityGuard::new(
185+
Arc::clone(&guard_condition),
186+
Arc::clone(&guard_condition.in_use_by_wait_set),
187+
)?;
188+
189+
unsafe {
190+
// SAFETY: Safe if the wait set and guard condition are initialized
191+
rcl_wait_set_add_guard_condition(
192+
&mut self.rcl_wait_set,
193+
&*guard_condition.rcl_guard_condition.lock().unwrap(),
194+
std::ptr::null_mut(),
195+
)
196+
.ok()?;
197+
}
198+
self.guard_conditions.push(exclusive_guard_condition);
199+
Ok(())
200+
}
201+
162202
/// Adds a client to the wait set.
163203
///
164204
/// # Errors
@@ -262,6 +302,7 @@ impl WaitSet {
262302
let mut ready_entities = ReadyEntities {
263303
subscriptions: Vec::new(),
264304
clients: Vec::new(),
305+
guard_conditions: Vec::new(),
265306
services: Vec::new(),
266307
};
267308
for (i, subscription) in self.subscriptions.iter().enumerate() {
@@ -275,6 +316,7 @@ impl WaitSet {
275316
.push(Arc::clone(&subscription.waitable));
276317
}
277318
}
319+
278320
for (i, client) in self.clients.iter().enumerate() {
279321
// SAFETY: The `clients` entry is an array of pointers, and this dereferencing is
280322
// equivalent to
@@ -284,6 +326,17 @@ impl WaitSet {
284326
ready_entities.clients.push(Arc::clone(&client.waitable));
285327
}
286328
}
329+
330+
for (i, guard_condition) in self.guard_conditions.iter().enumerate() {
331+
// SAFETY: The `clients` entry is an array of pointers, and this dereferencing is
332+
// equivalent to
333+
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
334+
let wait_set_entry = unsafe { *self.rcl_wait_set.guard_conditions.add(i) };
335+
if !wait_set_entry.is_null() {
336+
ready_entities.guard_conditions.push(Arc::clone(&guard_condition.waitable));
337+
}
338+
}
339+
287340
for (i, service) in self.services.iter().enumerate() {
288341
// SAFETY: The `services` entry is an array of pointers, and this dereferencing is
289342
// equivalent to
@@ -309,4 +362,23 @@ mod tests {
309362
assert_send::<WaitSet>();
310363
assert_sync::<WaitSet>();
311364
}
365+
366+
#[test]
367+
fn guard_condition_in_wait_set_readies() -> Result<(), RclrsError> {
368+
let context = Context::new([])?;
369+
370+
let guard_condition = GuardCondition::new(
371+
&context,
372+
None,
373+
);
374+
375+
let mut wait_set = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
376+
wait_set.add_guard_condition(Arc::clone(&guard_condition))?;
377+
guard_condition.trigger()?;
378+
379+
let readies = wait_set.wait(Some(std::time::Duration::from_millis(10)))?;
380+
assert!(readies.guard_conditions.contains(&guard_condition));
381+
382+
Ok(())
383+
}
312384
}

rclrs/src/wait/guard_condition.rs

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
use crate::rcl_bindings::*;
2+
use crate::{Context, RclrsError, ToResult};
3+
4+
use std::sync::{atomic::AtomicBool, Arc, Mutex};
5+
6+
/// A waitable entity used for waking up a wait set manually.
7+
///
8+
/// If a wait set that is currently waiting on events should be interrupted from a separate thread, this can be done
9+
/// by adding an `Arc<GuardCondition>` to the wait set, and calling `trigger()` on the same `GuardCondition` while
10+
/// the wait set is waiting.
11+
///
12+
/// The guard condition may be reused multiple times, but like other waitable entities, can not be used in
13+
/// multiple wait sets concurrently.
14+
///
15+
/// # Example
16+
/// ```
17+
/// # use rclrs::{Context, GuardCondition, WaitSet, RclrsError};
18+
/// # use std::sync::{Arc, atomic::Ordering};
19+
///
20+
/// let context = Context::new([])?;
21+
///
22+
/// let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
23+
/// let atomic_bool_for_closure = Arc::clone(&atomic_bool);
24+
///
25+
/// let gc = Arc::new(GuardCondition::new(
26+
/// &context,
27+
/// Some(Box::new(move || {
28+
/// atomic_bool_for_closure.store(true, Ordering::Relaxed);
29+
/// })),
30+
/// ));
31+
///
32+
/// let mut ws = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
33+
/// ws.add_guard_condition(Arc::clone(&gc))?;
34+
///
35+
/// // Trigger the guard condition, firing the callback and waking the wait set being waited on, if any.
36+
/// gc.trigger()?;
37+
///
38+
/// // The provided callback has now been called.
39+
/// assert_eq!(atomic_bool.load(Ordering::Relaxed), true);
40+
///
41+
/// // The wait call will now immediately return.
42+
/// ws.wait(Some(std::time::Duration::from_millis(10)))?;
43+
///
44+
/// # Ok::<(), RclrsError>(())
45+
/// ```
46+
pub struct GuardCondition {
47+
/// The rcl_guard_condition_t that this struct encapsulates.
48+
pub(crate) rcl_guard_condition: Arc<Mutex<rcl_guard_condition_t>>,
49+
/// An optional callback to call when this guard condition is triggered.
50+
callback: Option<Box<dyn Fn() + Send + Sync>>,
51+
/// A flag to indicate if this guard condition has already been assigned to a wait set.
52+
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
53+
}
54+
55+
impl Drop for GuardCondition {
56+
fn drop(&mut self) {
57+
unsafe {
58+
// SAFETY: No precondition for this function (besides passing in a valid guard condition)
59+
rcl_guard_condition_fini(&mut *self.rcl_guard_condition.lock().unwrap());
60+
}
61+
}
62+
}
63+
64+
impl PartialEq for GuardCondition {
65+
fn eq(&self, other: &Self) -> bool {
66+
// Because GuardCondition controls the creation of the rcl_guard_condition, each unique GuardCondition should have a unique
67+
// rcl_guard_condition. Thus comparing equality of this member should be enough.
68+
Arc::ptr_eq(&self.rcl_guard_condition, &other.rcl_guard_condition)
69+
}
70+
}
71+
72+
impl Eq for GuardCondition {}
73+
74+
// SAFETY: rcl_guard_condition is the only member that doesn't implement Send, and it is designed to be accessed from other threads
75+
unsafe impl Send for rcl_guard_condition_t {}
76+
77+
// SAFETY: The mutexes and atomic members ensure synchronized access to members, and the callback is reentrant
78+
unsafe impl Sync for GuardCondition {}
79+
80+
impl GuardCondition {
81+
/// Creates a new guard condition.
82+
pub fn new(context: &Context, callback: Option<Box<dyn Fn() + Send + Sync>>) -> Arc<Self> {
83+
// SAFETY: Getting a zero initialized value is always safe
84+
let mut guard_condition = unsafe { rcl_get_zero_initialized_guard_condition() };
85+
unsafe {
86+
// SAFETY: The context must be valid, and the guard condition must be zero-initialized
87+
rcl_guard_condition_init(
88+
&mut guard_condition,
89+
&mut *context.rcl_context_mtx.lock().unwrap(),
90+
rcl_guard_condition_get_default_options(),
91+
);
92+
}
93+
94+
Arc::new(Self {
95+
rcl_guard_condition: Arc::new(Mutex::new(guard_condition)),
96+
callback,
97+
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
98+
})
99+
}
100+
101+
/// Triggers this guard condition, activating the wait set, and calling the optionally assigned callback.
102+
pub fn trigger(&self) -> Result<(), RclrsError> {
103+
unsafe {
104+
// SAFETY: The rcl_guard_condition_t is valid.
105+
rcl_trigger_guard_condition(&mut *self.rcl_guard_condition.lock().unwrap()).ok()?;
106+
}
107+
if let Some(callback) = &self.callback {
108+
callback();
109+
}
110+
Ok(())
111+
}
112+
}
113+
114+
#[cfg(test)]
115+
mod tests {
116+
use super::*;
117+
use crate::WaitSet;
118+
use std::sync::atomic::Ordering;
119+
120+
#[test]
121+
fn test_guard_condition() -> Result<(), RclrsError> {
122+
let context = Context::new([])?;
123+
124+
let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
125+
let atomic_bool_for_closure = Arc::clone(&atomic_bool);
126+
127+
let guard_condition = GuardCondition::new(
128+
&context,
129+
Some(Box::new(move || {
130+
atomic_bool_for_closure.store(true, Ordering::Relaxed);
131+
})),
132+
);
133+
134+
guard_condition.trigger()?;
135+
136+
assert!(atomic_bool.load(Ordering::Relaxed));
137+
138+
Ok(())
139+
}
140+
141+
#[test]
142+
fn test_guard_condition_wait() -> Result<(), RclrsError> {
143+
let context = Context::new([])?;
144+
145+
let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
146+
let atomic_bool_for_closure = Arc::clone(&atomic_bool);
147+
148+
let guard_condition = GuardCondition::new(
149+
&context,
150+
Some(Box::new(move || {
151+
atomic_bool_for_closure.store(true, Ordering::Relaxed);
152+
})),
153+
);
154+
155+
let mut wait_set = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
156+
wait_set.add_guard_condition(Arc::clone(&guard_condition))?;
157+
guard_condition.trigger()?;
158+
159+
assert!(atomic_bool.load(Ordering::Relaxed));
160+
wait_set.wait(Some(std::time::Duration::from_millis(10)))?;
161+
162+
Ok(())
163+
}
164+
165+
fn assert_send<T: Send>() {}
166+
fn assert_sync<T: Sync>() {}
167+
168+
#[test]
169+
fn test_guard_condition_is_send_and_sync() {
170+
assert_send::<GuardCondition>();
171+
assert_sync::<GuardCondition>();
172+
}
173+
}

0 commit comments

Comments
 (0)