Skip to content

Commit 120817b

Browse files
Adds Guard Conditions
- Adds the Guard Condition struct encapsulating rcl_guard_condition_t. - Adds optional callbacks and a trigger method to approximate the rclcpp implementation. - Adds an `add_guard_condition` method to WaitSet to add the GuardCondition to the WaitSet
1 parent d3b1171 commit 120817b

File tree

2 files changed

+188
-0
lines changed

2 files changed

+188
-0
lines changed

rclrs/src/wait.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ use std::vec::Vec;
2626
mod exclusivity_guard;
2727
use exclusivity_guard::*;
2828

29+
mod guard_condition;
30+
use guard_condition::*;
31+
2932
/// A struct for waiting on subscriptions and other waitable entities to become ready.
3033
pub struct WaitSet {
3134
rcl_wait_set: rcl_wait_set_t,
@@ -36,6 +39,8 @@ pub struct WaitSet {
3639
// even in the error case.
3740
subscriptions: Vec<ExclusivityGuard<Arc<dyn SubscriptionBase>>>,
3841
clients: Vec<ExclusivityGuard<Arc<dyn ClientBase>>>,
42+
// The guard conditions that are currently registered in the wait set.
43+
guard_conditions: Vec<ExclusivityGuard<Arc<Mutex<GuardCondition>>>>,
3944
services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
4045
}
4146

@@ -45,6 +50,8 @@ pub struct ReadyEntities {
4550
pub subscriptions: Vec<Arc<dyn SubscriptionBase>>,
4651
/// A list of clients that have potentially received responses.
4752
pub clients: Vec<Arc<dyn ClientBase>>,
53+
/// A list of guard conditions that have potentially been triggered.
54+
pub guard_conditions: Vec<Arc<Mutex<GuardCondition>>>,
4855
/// A list of services that have potentially received requests.
4956
pub services: Vec<Arc<dyn ServiceBase>>,
5057
}
@@ -96,6 +103,7 @@ impl WaitSet {
96103
rcl_wait_set,
97104
_rcl_context_mtx: context.rcl_context_mtx.clone(),
98105
subscriptions: Vec::new(),
106+
guard_conditions: Vec::new(),
99107
clients: Vec::new(),
100108
services: Vec::new(),
101109
})
@@ -107,6 +115,7 @@ impl WaitSet {
107115
/// [`WaitSet::new`].
108116
pub fn clear(&mut self) {
109117
self.subscriptions.clear();
118+
self.guard_conditions.clear();
110119
self.clients.clear();
111120
self.services.clear();
112121
// This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
@@ -150,6 +159,38 @@ impl WaitSet {
150159
Ok(())
151160
}
152161

162+
/// Adds a guard condition to the wait set.
163+
///
164+
/// # Errors
165+
/// - If the guard condition was already added to this wait set or another one,
166+
/// [`AlreadyAddedToWaitSet`][1] will be returned
167+
/// - If the number of guard condition in the wait set is larger than the
168+
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
169+
///
170+
/// [1]: crate::RclrsError
171+
/// [2]: crate::RclReturnCode
172+
pub fn add_guard_condition(
173+
&mut self,
174+
guard_condition: Arc<Mutex<GuardCondition>>,
175+
) -> Result<(), RclrsError> {
176+
let exclusive_guard_condition = ExclusivityGuard::new(
177+
Arc::clone(&guard_condition),
178+
Arc::clone(&guard_condition.lock().unwrap().in_use_by_wait_set),
179+
)?;
180+
181+
unsafe {
182+
// SAFETY: Safe if the wait set and guard condition are initialized
183+
rcl_wait_set_add_guard_condition(
184+
&mut self.rcl_wait_set,
185+
&*guard_condition.lock().unwrap().lock(),
186+
std::ptr::null_mut(),
187+
)
188+
.ok()?;
189+
}
190+
self.guard_conditions.push(exclusive_guard_condition);
191+
Ok(())
192+
}
193+
153194
/// Adds a client to the wait set.
154195
///
155196
/// # Errors
@@ -252,6 +293,7 @@ impl WaitSet {
252293
unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok()?;
253294
let mut ready_entities = ReadyEntities {
254295
subscriptions: Vec::new(),
296+
guard_conditions: Vec::new(),
255297
clients: Vec::new(),
256298
services: Vec::new(),
257299
};
@@ -266,6 +308,19 @@ impl WaitSet {
266308
.push(Arc::clone(&subscription.waitable));
267309
}
268310
}
311+
312+
for (i, guard_condition) in self.guard_conditions.iter().enumerate() {
313+
// SAFETY: The `guard_conditions` entry is an array of pointers, and this dereferencing is
314+
// equivalent to
315+
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
316+
let wait_set_entry = unsafe { *self.rcl_wait_set.guard_conditions.add(i) };
317+
if !wait_set_entry.is_null() {
318+
ready_entities
319+
.guard_conditions
320+
.push(Arc::clone(&guard_condition.waitable));
321+
}
322+
}
323+
269324
for (i, client) in self.clients.iter().enumerate() {
270325
// SAFETY: The `clients` entry is an array of pointers, and this dereferencing is
271326
// equivalent to

rclrs/src/wait/guard_condition.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
use crate::rcl_bindings::*;
2+
use crate::{Context, RclrsError, ToResult};
3+
4+
use std::sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard};
5+
6+
/// A struct for encapsulating a guard condition - a waitable trigger
7+
pub struct GuardCondition {
8+
/// The rcl_guard_condition_t that this struct encapsulates.
9+
rcl_guard_condition: Arc<Mutex<rcl_guard_condition_t>>,
10+
/// An optional callback to call when this guard condition is triggered.
11+
callback: Option<Box<dyn Fn(usize)>>,
12+
/// A flag to indicate if this guard condition has already been assigned to a wait set.
13+
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
14+
/// A count for the number of times this guard condition was triggered, but no callback was assigned.
15+
unread_count: usize,
16+
}
17+
18+
impl Drop for GuardCondition {
19+
fn drop(&mut self) {
20+
unsafe {
21+
// SAFETY: No precondition for this function (besides passing in a valid guard condition)
22+
rcl_guard_condition_fini(&mut *self.rcl_guard_condition.lock().unwrap());
23+
}
24+
}
25+
}
26+
27+
impl GuardCondition {
28+
/// Creates a new guard condition
29+
pub fn new(context: &Context) -> Self {
30+
let mut guard_condition = unsafe { rcl_get_zero_initialized_guard_condition() };
31+
unsafe {
32+
// SAFETY: The context must be valid. No other preconditions for this function.
33+
rcl_guard_condition_init(
34+
&mut guard_condition,
35+
&mut *context.rcl_context_mtx.lock().unwrap(),
36+
rcl_guard_condition_get_default_options(),
37+
);
38+
}
39+
40+
Self {
41+
rcl_guard_condition: Arc::new(Mutex::new(guard_condition)),
42+
callback: None,
43+
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
44+
unread_count: 0,
45+
}
46+
}
47+
48+
/// Locks the underlying guard condition and returns it.
49+
pub fn lock(&self) -> MutexGuard<rcl_guard_condition_t> {
50+
self.rcl_guard_condition.lock().unwrap()
51+
}
52+
53+
/// Sets the callback to call when this guard condition is triggered.
54+
pub fn set_on_trigger_callback(&mut self, callback: Option<Box<dyn Fn(usize)>>) {
55+
match callback {
56+
Some(callback) => {
57+
if self.unread_count > 0 {
58+
callback(self.unread_count);
59+
self.unread_count = 0;
60+
}
61+
self.callback = Some(callback);
62+
}
63+
None => {
64+
self.callback = None;
65+
self.unread_count = 0;
66+
}
67+
}
68+
}
69+
70+
/// Triggers this guard condition, activating the wait set, and calling the optionally assigned callback.
71+
pub fn trigger(&mut self) -> Result<(), RclrsError> {
72+
unsafe {
73+
// SAFETY: The rcl_guard_condition_t is valid.
74+
rcl_trigger_guard_condition(&mut *self.rcl_guard_condition.lock().unwrap()).ok()?;
75+
}
76+
match &self.callback {
77+
Some(callback) => callback(1),
78+
None => self.unread_count += 1,
79+
}
80+
Ok(())
81+
}
82+
}
83+
84+
#[cfg(test)]
85+
mod tests {
86+
use super::*;
87+
use crate::WaitSet;
88+
use std::sync::atomic::Ordering;
89+
90+
#[test]
91+
fn test_guard_condition() -> Result<(), RclrsError> {
92+
let context = Context::new([])?;
93+
let mut gc = GuardCondition::new(&context);
94+
95+
let atomic_usize = Arc::new(std::sync::atomic::AtomicUsize::new(0));
96+
let atomic_usize_for_closure = Arc::clone(&atomic_usize);
97+
98+
gc.set_on_trigger_callback(Some(Box::new(move |count| {
99+
atomic_usize_for_closure.store(count, Ordering::Relaxed);
100+
})));
101+
102+
gc.trigger()?;
103+
104+
assert_eq!(atomic_usize.load(Ordering::Relaxed), 1);
105+
106+
Ok(())
107+
}
108+
109+
#[test]
110+
fn test_guard_condition_wait() -> Result<(), RclrsError> {
111+
let context = Context::new([])?;
112+
let gc = Arc::new(Mutex::new(GuardCondition::new(&context)));
113+
114+
let atomic_usize = Arc::new(std::sync::atomic::AtomicUsize::new(0));
115+
let atomic_usize_for_closure = Arc::clone(&atomic_usize);
116+
117+
gc.lock()
118+
.unwrap()
119+
.set_on_trigger_callback(Some(Box::new(move |count| {
120+
atomic_usize_for_closure.store(count, Ordering::Relaxed);
121+
})));
122+
123+
let mut ws = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
124+
ws.add_guard_condition(Arc::clone(&gc))?;
125+
gc.lock().unwrap().trigger()?;
126+
127+
assert_eq!(atomic_usize.load(Ordering::Relaxed), 1);
128+
let wait_result = ws.wait(Some(std::time::Duration::from_millis(10)))?;
129+
assert_eq!(wait_result.guard_conditions.len(), 1);
130+
131+
Ok(())
132+
}
133+
}

0 commit comments

Comments
 (0)