Skip to content

Commit 862167c

Browse files
Apply suggestions from code review
Co-authored-by: Nikolai Morin <nnmmgit@gmail.com>
1 parent c6e3145 commit 862167c

File tree

2 files changed

+71
-78
lines changed

2 files changed

+71
-78
lines changed

rclrs/src/wait.rs

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ use crate::error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult};
1919
use crate::rcl_bindings::*;
2020
use crate::{ClientBase, Context, ServiceBase, SubscriptionBase};
2121

22-
use std::sync::{atomic::Ordering, Arc, Mutex};
22+
use std::sync::{Arc, Mutex};
2323
use std::time::Duration;
2424
use std::vec::Vec;
2525

2626
mod exclusivity_guard;
27-
use exclusivity_guard::*;
28-
2927
mod guard_condition;
30-
use guard_condition::*;
28+
use exclusivity_guard::*;
29+
pub use guard_condition::*;
3130

3231
/// A struct for waiting on subscriptions and other waitable entities to become ready.
3332
pub struct WaitSet {
@@ -40,7 +39,7 @@ pub struct WaitSet {
4039
subscriptions: Vec<ExclusivityGuard<Arc<dyn SubscriptionBase>>>,
4140
clients: Vec<ExclusivityGuard<Arc<dyn ClientBase>>>,
4241
// The guard conditions that are currently registered in the wait set.
43-
guard_conditions: Vec<ExclusivityGuard<Arc<Mutex<GuardCondition>>>>,
42+
guard_conditions: Vec<ExclusivityGuard<Arc<GuardCondition>>>,
4443
services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
4544
}
4645

@@ -50,8 +49,6 @@ pub struct ReadyEntities {
5049
pub subscriptions: Vec<Arc<dyn SubscriptionBase>>,
5150
/// A list of clients that have potentially received responses.
5251
pub clients: Vec<Arc<dyn ClientBase>>,
53-
/// A list of guard conditions that have potentially been triggered.
54-
pub guard_conditions: Vec<Arc<Mutex<GuardCondition>>>,
5552
/// A list of services that have potentially received requests.
5653
pub services: Vec<Arc<dyn ServiceBase>>,
5754
}
@@ -115,14 +112,7 @@ impl WaitSet {
115112
/// [`WaitSet::new`].
116113
pub fn clear(&mut self) {
117114
self.subscriptions.clear();
118-
// Guard conditions must be told that they are no longer tied to a wait set
119-
self.guard_conditions.drain(..).for_each(|elem| {
120-
elem.waitable
121-
.lock()
122-
.unwrap()
123-
.in_use_by_wait_set
124-
.store(false, Ordering::Relaxed)
125-
});
115+
self.guard_conditions.clear();
126116
self.clients.clear();
127117
self.services.clear();
128118
// This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
@@ -178,18 +168,18 @@ impl WaitSet {
178168
/// [2]: crate::RclReturnCode
179169
pub fn add_guard_condition(
180170
&mut self,
181-
guard_condition: Arc<Mutex<GuardCondition>>,
171+
guard_condition: Arc<GuardCondition>,
182172
) -> Result<(), RclrsError> {
183173
let exclusive_guard_condition = ExclusivityGuard::new(
184174
Arc::clone(&guard_condition),
185-
Arc::clone(&guard_condition.lock().unwrap().in_use_by_wait_set),
175+
Arc::clone(&guard_condition.in_use_by_wait_set),
186176
)?;
187177

188178
unsafe {
189179
// SAFETY: Safe if the wait set and guard condition are initialized
190180
rcl_wait_set_add_guard_condition(
191181
&mut self.rcl_wait_set,
192-
&*guard_condition.lock().unwrap().lock(),
182+
&*guard_condition.lock(),
193183
std::ptr::null_mut(),
194184
)
195185
.ok()?;
@@ -300,7 +290,6 @@ impl WaitSet {
300290
unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok()?;
301291
let mut ready_entities = ReadyEntities {
302292
subscriptions: Vec::new(),
303-
guard_conditions: Vec::new(),
304293
clients: Vec::new(),
305294
services: Vec::new(),
306295
};
@@ -316,18 +305,6 @@ impl WaitSet {
316305
}
317306
}
318307

319-
for (i, guard_condition) in self.guard_conditions.iter().enumerate() {
320-
// SAFETY: The `guard_conditions` entry is an array of pointers, and this dereferencing is
321-
// equivalent to
322-
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
323-
let wait_set_entry = unsafe { *self.rcl_wait_set.guard_conditions.add(i) };
324-
if !wait_set_entry.is_null() {
325-
ready_entities
326-
.guard_conditions
327-
.push(Arc::clone(&guard_condition.waitable));
328-
}
329-
}
330-
331308
for (i, client) in self.clients.iter().enumerate() {
332309
// SAFETY: The `clients` entry is an array of pointers, and this dereferencing is
333310
// equivalent to

rclrs/src/wait/guard_condition.rs

Lines changed: 63 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,42 @@ use crate::{Context, RclrsError, ToResult};
44
use std::sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard};
55

66
/// A struct for encapsulating a guard condition - a waitable trigger
7+
///
8+
/// # Example
9+
/// ```
10+
/// # use rclrs::{Context, GuardCondition, WaitSet, RclrsError};
11+
/// # use std::sync::{Arc, atomic::Ordering};
12+
///
13+
/// let context = Context::new([])?;
14+
///
15+
/// let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
16+
/// let atomic_bool_for_closure = Arc::clone(&atomic_bool);
17+
///
18+
/// let gc = Arc::new(GuardCondition::new(
19+
/// &context,
20+
/// Some(Box::new(move || {
21+
/// atomic_bool_for_closure.store(true, Ordering::Relaxed);
22+
/// })),
23+
/// ));
24+
///
25+
/// let mut ws = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
26+
/// ws.add_guard_condition(Arc::clone(&gc))?;
27+
///
28+
/// // Trigger the guard condition, firing the callback and waking any wait sets being waited on
29+
/// gc.trigger()?;
30+
///
31+
/// // The wait call will now immediately return
32+
/// ws.wait(Some(std::time::Duration::from_millis(10)))?;
33+
///
34+
/// # Ok::<(), RclrsError>(())
35+
/// ```
736
pub struct GuardCondition {
837
/// The rcl_guard_condition_t that this struct encapsulates.
938
rcl_guard_condition: Arc<Mutex<rcl_guard_condition_t>>,
1039
/// An optional callback to call when this guard condition is triggered.
11-
callback: Option<Box<dyn Fn(usize)>>,
40+
callback: Option<Box<dyn Fn() + Send>>,
1241
/// A flag to indicate if this guard condition has already been assigned to a wait set.
1342
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
14-
/// A count for the number of times this guard condition was triggered, but no callback was assigned.
15-
unread_count: usize,
1643
}
1744

1845
impl Drop for GuardCondition {
@@ -24,12 +51,19 @@ impl Drop for GuardCondition {
2451
}
2552
}
2653

54+
// SAFETY: rcl_guard_condition is the only member that doesn't implement Send, and it is designed to be accessed from other threads
55+
unsafe impl Send for GuardCondition {}
56+
57+
// SAFETY: The mutexes and atomic members ensure synchronized access to members, and the callback is reentrant
58+
unsafe impl Sync for GuardCondition {}
59+
2760
impl GuardCondition {
2861
/// Creates a new guard condition
29-
pub fn new(context: &Context) -> Self {
62+
pub fn new(context: &Context, callback: Option<Box<dyn Fn() + Send>>) -> Self {
63+
// SAFETY: Getting a zero initialized value is always safe
3064
let mut guard_condition = unsafe { rcl_get_zero_initialized_guard_condition() };
3165
unsafe {
32-
// SAFETY: The context must be valid. No other preconditions for this function.
66+
// SAFETY: The context must be valid, and the guard condition must be zero-initialized
3367
rcl_guard_condition_init(
3468
&mut guard_condition,
3569
&mut *context.rcl_context_mtx.lock().unwrap(),
@@ -39,9 +73,8 @@ impl GuardCondition {
3973

4074
Self {
4175
rcl_guard_condition: Arc::new(Mutex::new(guard_condition)),
42-
callback: None,
76+
callback,
4377
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
44-
unread_count: 0,
4578
}
4679
}
4780

@@ -50,32 +83,14 @@ impl GuardCondition {
5083
self.rcl_guard_condition.lock().unwrap()
5184
}
5285

53-
/// Sets the callback to call when this guard condition is triggered.
54-
pub fn set_on_trigger_callback(&mut self, callback: Option<Box<dyn Fn(usize)>>) {
55-
match callback {
56-
Some(callback) => {
57-
if self.unread_count > 0 {
58-
callback(self.unread_count);
59-
self.unread_count = 0;
60-
}
61-
self.callback = Some(callback);
62-
}
63-
None => {
64-
self.callback = None;
65-
self.unread_count = 0;
66-
}
67-
}
68-
}
69-
7086
/// Triggers this guard condition, activating the wait set, and calling the optionally assigned callback.
71-
pub fn trigger(&mut self) -> Result<(), RclrsError> {
87+
pub fn trigger(&self) -> Result<(), RclrsError> {
7288
unsafe {
7389
// SAFETY: The rcl_guard_condition_t is valid.
7490
rcl_trigger_guard_condition(&mut *self.rcl_guard_condition.lock().unwrap()).ok()?;
7591
}
76-
match &self.callback {
77-
Some(callback) => callback(1),
78-
None => self.unread_count += 1,
92+
if self.callback.is_some() {
93+
self.callback.as_ref().unwrap()();
7994
}
8095
Ok(())
8196
}
@@ -90,43 +105,44 @@ mod tests {
90105
#[test]
91106
fn test_guard_condition() -> Result<(), RclrsError> {
92107
let context = Context::new([])?;
93-
let mut gc = GuardCondition::new(&context);
94108

95-
let atomic_usize = Arc::new(std::sync::atomic::AtomicUsize::new(0));
96-
let atomic_usize_for_closure = Arc::clone(&atomic_usize);
109+
let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
110+
let atomic_bool_for_closure = Arc::clone(&atomic_bool);
97111

98-
gc.set_on_trigger_callback(Some(Box::new(move |count| {
99-
atomic_usize_for_closure.store(count, Ordering::Relaxed);
100-
})));
112+
let gc = GuardCondition::new(
113+
&context,
114+
Some(Box::new(move || {
115+
atomic_bool_for_closure.store(true, Ordering::Relaxed);
116+
})),
117+
);
101118

102119
gc.trigger()?;
103120

104-
assert_eq!(atomic_usize.load(Ordering::Relaxed), 1);
121+
assert_eq!(atomic_bool.load(Ordering::Relaxed), true);
105122

106123
Ok(())
107124
}
108125

109126
#[test]
110127
fn test_guard_condition_wait() -> Result<(), RclrsError> {
111128
let context = Context::new([])?;
112-
let gc = Arc::new(Mutex::new(GuardCondition::new(&context)));
113129

114-
let atomic_usize = Arc::new(std::sync::atomic::AtomicUsize::new(0));
115-
let atomic_usize_for_closure = Arc::clone(&atomic_usize);
130+
let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
131+
let atomic_bool_for_closure = Arc::clone(&atomic_bool);
116132

117-
gc.lock()
118-
.unwrap()
119-
.set_on_trigger_callback(Some(Box::new(move |count| {
120-
atomic_usize_for_closure.store(count, Ordering::Relaxed);
121-
})));
133+
let gc = Arc::new(GuardCondition::new(
134+
&context,
135+
Some(Box::new(move || {
136+
atomic_bool_for_closure.store(true, Ordering::Relaxed);
137+
})),
138+
));
122139

123140
let mut ws = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
124141
ws.add_guard_condition(Arc::clone(&gc))?;
125-
gc.lock().unwrap().trigger()?;
142+
gc.trigger()?;
126143

127-
assert_eq!(atomic_usize.load(Ordering::Relaxed), 1);
128-
let wait_result = ws.wait(Some(std::time::Duration::from_millis(10)))?;
129-
assert_eq!(wait_result.guard_conditions.len(), 1);
144+
assert_eq!(atomic_bool.load(Ordering::Relaxed), true);
145+
ws.wait(Some(std::time::Duration::from_millis(10)))?;
130146

131147
Ok(())
132148
}

0 commit comments

Comments
 (0)