Skip to content

Commit b9c242b

Browse files
Adds Guard Conditions
- Adds the Guard Condition struct encapsulating rcl_guard_condition_t. - Adds optional callbacks and a trigger method to approximate the rclcpp implementation. - Adds an `add_guard_condition` method to WaitSet to add the GuardCondition to the WaitSet
1 parent d3b1171 commit b9c242b

File tree

2 files changed

+195
-0
lines changed

2 files changed

+195
-0
lines changed

rclrs/src/wait.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use std::time::Duration;
2424
use std::vec::Vec;
2525

2626
mod exclusivity_guard;
27+
mod guard_condition;
2728
use exclusivity_guard::*;
29+
pub use guard_condition::*;
2830

2931
/// A struct for waiting on subscriptions and other waitable entities to become ready.
3032
pub struct WaitSet {
@@ -36,6 +38,8 @@ pub struct WaitSet {
3638
// even in the error case.
3739
subscriptions: Vec<ExclusivityGuard<Arc<dyn SubscriptionBase>>>,
3840
clients: Vec<ExclusivityGuard<Arc<dyn ClientBase>>>,
41+
// The guard conditions that are currently registered in the wait set.
42+
guard_conditions: Vec<ExclusivityGuard<Arc<GuardCondition>>>,
3943
services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
4044
}
4145

@@ -96,6 +100,7 @@ impl WaitSet {
96100
rcl_wait_set,
97101
_rcl_context_mtx: context.rcl_context_mtx.clone(),
98102
subscriptions: Vec::new(),
103+
guard_conditions: Vec::new(),
99104
clients: Vec::new(),
100105
services: Vec::new(),
101106
})
@@ -107,6 +112,7 @@ impl WaitSet {
107112
/// [`WaitSet::new`].
108113
pub fn clear(&mut self) {
109114
self.subscriptions.clear();
115+
self.guard_conditions.clear();
110116
self.clients.clear();
111117
self.services.clear();
112118
// This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
@@ -150,6 +156,38 @@ impl WaitSet {
150156
Ok(())
151157
}
152158

159+
/// Adds a guard condition to the wait set.
160+
///
161+
/// # Errors
162+
/// - If the guard condition was already added to this wait set or another one,
163+
/// [`AlreadyAddedToWaitSet`][1] will be returned
164+
/// - If the number of guard conditions in the wait set is larger than the
165+
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
166+
///
167+
/// [1]: crate::RclrsError
168+
/// [2]: crate::RclReturnCode
169+
pub fn add_guard_condition(
170+
&mut self,
171+
guard_condition: Arc<GuardCondition>,
172+
) -> Result<(), RclrsError> {
173+
let exclusive_guard_condition = ExclusivityGuard::new(
174+
Arc::clone(&guard_condition),
175+
Arc::clone(&guard_condition.in_use_by_wait_set),
176+
)?;
177+
178+
unsafe {
179+
// SAFETY: Safe if the wait set and guard condition are initialized
180+
rcl_wait_set_add_guard_condition(
181+
&mut self.rcl_wait_set,
182+
&*guard_condition.rcl_guard_condition.lock().unwrap(),
183+
std::ptr::null_mut(),
184+
)
185+
.ok()?;
186+
}
187+
self.guard_conditions.push(exclusive_guard_condition);
188+
Ok(())
189+
}
190+
153191
/// Adds a client to the wait set.
154192
///
155193
/// # Errors
@@ -266,6 +304,7 @@ impl WaitSet {
266304
.push(Arc::clone(&subscription.waitable));
267305
}
268306
}
307+
269308
for (i, client) in self.clients.iter().enumerate() {
270309
// SAFETY: The `clients` entry is an array of pointers, and this dereferencing is
271310
// equivalent to

rclrs/src/wait/guard_condition.rs

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
use crate::rcl_bindings::*;
2+
use crate::{Context, RclrsError, ToResult};
3+
4+
use std::sync::{atomic::AtomicBool, Arc, Mutex};
5+
6+
/// A waitiable entity used for waking up a wait set manually
7+
///
8+
/// # Example
9+
/// ```
10+
/// # use rclrs::{Context, GuardCondition, WaitSet, RclrsError};
11+
/// # use std::sync::{Arc, atomic::Ordering};
12+
///
13+
/// let context = Context::new([])?;
14+
///
15+
/// let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
16+
/// let atomic_bool_for_closure = Arc::clone(&atomic_bool);
17+
///
18+
/// let gc = Arc::new(GuardCondition::new(
19+
/// &context,
20+
/// Some(Box::new(move || {
21+
/// atomic_bool_for_closure.store(true, Ordering::Relaxed);
22+
/// })),
23+
/// ));
24+
///
25+
/// let mut ws = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
26+
/// ws.add_guard_condition(Arc::clone(&gc))?;
27+
///
28+
/// // Trigger the guard condition, firing the callback and waking the wait set being waited on, if any.
29+
/// gc.trigger()?;
30+
///
31+
/// // The provided callback has now been called.
32+
/// assert_eq!(atomic_bool.load(Ordering::Relaxed), true);
33+
///
34+
/// // The wait call will now immediately return.
35+
/// ws.wait(Some(std::time::Duration::from_millis(10)))?;
36+
///
37+
/// # Ok::<(), RclrsError>(())
38+
/// ```
39+
pub struct GuardCondition {
40+
/// The rcl_guard_condition_t that this struct encapsulates.
41+
pub(crate) rcl_guard_condition: Arc<Mutex<rcl_guard_condition_t>>,
42+
/// An optional callback to call when this guard condition is triggered.
43+
callback: Option<Box<dyn Fn() + Send + Sync>>,
44+
/// A flag to indicate if this guard condition has already been assigned to a wait set.
45+
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
46+
}
47+
48+
impl Drop for GuardCondition {
49+
fn drop(&mut self) {
50+
unsafe {
51+
// SAFETY: No precondition for this function (besides passing in a valid guard condition)
52+
rcl_guard_condition_fini(&mut *self.rcl_guard_condition.lock().unwrap());
53+
}
54+
}
55+
}
56+
57+
// SAFETY: rcl_guard_condition is the only member that doesn't implement Send, and it is designed to be accessed from other threads
58+
unsafe impl Send for rcl_guard_condition_t {}
59+
60+
// SAFETY: The mutexes and atomic members ensure synchronized access to members, and the callback is reentrant
61+
unsafe impl Sync for GuardCondition {}
62+
63+
impl GuardCondition {
64+
/// Creates a new guard condition.
65+
pub fn new(context: &Context, callback: Option<Box<dyn Fn() + Send + Sync>>) -> Arc<Self> {
66+
// SAFETY: Getting a zero initialized value is always safe
67+
let mut guard_condition = unsafe { rcl_get_zero_initialized_guard_condition() };
68+
unsafe {
69+
// SAFETY: The context must be valid, and the guard condition must be zero-initialized
70+
rcl_guard_condition_init(
71+
&mut guard_condition,
72+
&mut *context.rcl_context_mtx.lock().unwrap(),
73+
rcl_guard_condition_get_default_options(),
74+
);
75+
}
76+
77+
Arc::new(Self {
78+
rcl_guard_condition: Arc::new(Mutex::new(guard_condition)),
79+
callback,
80+
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
81+
})
82+
}
83+
84+
/// Triggers this guard condition, activating the wait set, and calling the optionally assigned callback.
85+
pub fn trigger(&self) -> Result<(), RclrsError> {
86+
unsafe {
87+
// SAFETY: The rcl_guard_condition_t is valid.
88+
rcl_trigger_guard_condition(&mut *self.rcl_guard_condition.lock().unwrap()).ok()?;
89+
}
90+
if let Some(callback) = &self.callback {
91+
callback();
92+
}
93+
Ok(())
94+
}
95+
}
96+
97+
#[cfg(test)]
98+
mod tests {
99+
use super::*;
100+
use crate::WaitSet;
101+
use std::sync::atomic::Ordering;
102+
103+
#[test]
104+
fn test_guard_condition() -> Result<(), RclrsError> {
105+
let context = Context::new([])?;
106+
107+
let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
108+
let atomic_bool_for_closure = Arc::clone(&atomic_bool);
109+
110+
let gc = GuardCondition::new(
111+
&context,
112+
Some(Box::new(move || {
113+
atomic_bool_for_closure.store(true, Ordering::Relaxed);
114+
})),
115+
);
116+
117+
gc.trigger()?;
118+
119+
assert_eq!(atomic_bool.load(Ordering::Relaxed), true);
120+
121+
Ok(())
122+
}
123+
124+
#[test]
125+
fn test_guard_condition_wait() -> Result<(), RclrsError> {
126+
let context = Context::new([])?;
127+
128+
let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
129+
let atomic_bool_for_closure = Arc::clone(&atomic_bool);
130+
131+
let gc = GuardCondition::new(
132+
&context,
133+
Some(Box::new(move || {
134+
atomic_bool_for_closure.store(true, Ordering::Relaxed);
135+
})),
136+
);
137+
138+
let mut ws = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
139+
ws.add_guard_condition(Arc::clone(&gc))?;
140+
gc.trigger()?;
141+
142+
assert_eq!(atomic_bool.load(Ordering::Relaxed), true);
143+
ws.wait(Some(std::time::Duration::from_millis(10)))?;
144+
145+
Ok(())
146+
}
147+
148+
fn assert_send<T: Send>() {}
149+
fn assert_sync<T: Sync>() {}
150+
151+
#[test]
152+
fn test_guard_condition_is_send_and_sync() {
153+
assert_send::<GuardCondition>();
154+
assert_sync::<GuardCondition>();
155+
}
156+
}

0 commit comments

Comments
 (0)