Skip to content

Basic Timer implementation #440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6f9543c
Basic Timer type compiling :).
agalbachicar Nov 28, 2024
7b42c02
Evaluates the Timer::new() againts different clock types.
agalbachicar Nov 29, 2024
a60d9f3
Implement time_until_next_call
agalbachicar Nov 29, 2024
d233b47
Added cancel behavior
JesusSilvaUtrera Nov 29, 2024
3de2bb9
Merge pull request #1 from JesusSilvaUtrera/jsilva/add_cancel
agalbachicar Nov 29, 2024
189606f
Implement rcl_timer_reset
agalbachicar Nov 29, 2024
59ed7e2
Adds Timer::call().
agalbachicar Nov 29, 2024
9af9dd9
Added timer_period_ns (#2)
JesusSilvaUtrera Nov 29, 2024
e46224f
Adds Timer::is_ready().
agalbachicar Nov 29, 2024
501439d
WIP Timer callback implementation.
agalbachicar Nov 29, 2024
132c9db
Preliminary callback.
agalbachicar Nov 29, 2024
1095351
Added comments to avoid warnings (#3)
JesusSilvaUtrera Nov 29, 2024
965ca22
Integrated the Timer into the WaitSet.
agalbachicar Nov 29, 2024
f503c84
Add create_timer to node (WIP) (#4)
JesusSilvaUtrera Nov 29, 2024
ed78b35
Makes it work with the integration demo.
agalbachicar Nov 29, 2024
214a991
Working E2E timer with node.
agalbachicar Nov 29, 2024
1eb1acc
Format fix.
agalbachicar Nov 29, 2024
91756ca
Fix a TODO for peace of mind.
agalbachicar Nov 29, 2024
fbb8629
Adds an example.
agalbachicar Dec 1, 2024
ab3d63a
Fix format for the example.
agalbachicar Dec 1, 2024
4515e9a
Adds tests, documentation and removes dead code in node.rs.
agalbachicar Dec 1, 2024
85930a3
Fix documentation style in clock.rs.
agalbachicar Dec 1, 2024
1563895
Removes duplicated test in node.rs
agalbachicar Dec 1, 2024
08acef5
Fix warnings while running tests in node.rs.
agalbachicar Dec 1, 2024
a4c1a97
Fix missing documentation in wait.rs.
agalbachicar Dec 1, 2024
655185d
Improvements to timer.
agalbachicar Dec 1, 2024
30a6717
Makes rustdoc pass in the examples.
agalbachicar Dec 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Integrated the Timer into the WaitSet.
Signed-off-by: Agustin Alba Chicar <ag.albachicar@gmail.com>
  • Loading branch information
agalbachicar committed Nov 29, 2024
commit 965ca22eba8b0980783c60cd66f5ed191c843bf5
6 changes: 5 additions & 1 deletion rclrs/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ impl SingleThreadedExecutor {
})
{
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;
let mut ready_entities = wait_set.wait(timeout)?;

for ready_timer in ready_entities.timers.iter_mut() {
ready_timer.execute()?;
}

for ready_subscription in ready_entities.subscriptions {
ready_subscription.execute()?;
Expand Down
69 changes: 58 additions & 11 deletions rclrs/src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,24 @@ use crate::{
clock::Clock, context::Context, error::RclrsError, rcl_bindings::*, to_rclrs_result
};
// use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::sync::{atomic::AtomicBool, Arc, Mutex};

pub type TimerCallback = Box<dyn FnMut(i64) + Send + Sync>;
pub type TimerCallback = Box<dyn Fn(i64) + Send + Sync>;

// #[derive(Debug)]
pub struct Timer {
rcl_timer: Arc<Mutex<rcl_timer_t>>,
pub(crate) rcl_timer: Arc<Mutex<rcl_timer_t>>,
callback: Option<TimerCallback>,
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
}

impl Timer {
/// Creates a new timer (constructor)
pub fn new(clock: &Clock, context: &Context, period: i64) -> Result<Timer, RclrsError> {
Self::with_callback(clock, context, period, None)
Self::new_with_callback(clock, context, period, None)
}

pub fn with_callback(clock: &Clock, context: &Context, period: i64, callback: Option<TimerCallback>) -> Result<Timer, RclrsError> {
pub fn new_with_callback(clock: &Clock, context: &Context, period: i64, callback: Option<TimerCallback>) -> Result<Timer, RclrsError> {
let mut rcl_timer;
let timer_init_result = unsafe {
// SAFETY: Getting a default value is always safe.
Expand All @@ -43,6 +44,7 @@ impl Timer {
Timer {
rcl_timer: Arc::new(Mutex::new(rcl_timer)),
callback,
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
}
})
}
Expand Down Expand Up @@ -114,8 +116,7 @@ impl Timer {
})
}

/// Resets the timer, setting the last call time to now
pub fn reset(&mut self) -> Result<(), RclrsError>
pub fn reset(&self) -> Result<(), RclrsError>
{
let mut rcl_timer = self.rcl_timer.lock().unwrap();
to_rclrs_result(unsafe {rcl_timer_reset(&mut *rcl_timer)})
Expand Down Expand Up @@ -144,7 +145,20 @@ impl Timer {
is_ready
})
}
// handle() -> RCLC Timer Type

pub(crate) fn execute(&self) -> Result<(), RclrsError>
{
if self.is_ready()?
{
let time_since_last_call = self.time_since_last_call()?;
self.call()?;
if let Some(ref callback) = self.callback
{
callback(time_since_last_call);
}
}
Ok(())
}
}

/// 'Drop' trait implementation to be able to release the resources
Expand All @@ -158,6 +172,12 @@ impl Drop for rcl_timer_t {
}
}

impl PartialEq for Timer {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.rcl_timer, &other.rcl_timer)
}
}

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for rcl_timer_t {}
Expand Down Expand Up @@ -282,7 +302,7 @@ mod tests {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period_ns: i64 = 2e6 as i64; // 2 milliseconds.
let mut dut = Timer::new(&clock, &context, period_ns).unwrap();
let dut = Timer::new(&clock, &context, period_ns).unwrap();
let elapsed = period_ns - dut.time_until_next_call().unwrap();
assert!(elapsed < tolerance , "elapsed before reset: {}", elapsed);
thread::sleep(time::Duration::from_millis(1));
Expand All @@ -297,7 +317,7 @@ mod tests {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
let mut dut = Timer::new(&clock, &context, period_ns).unwrap();
let dut = Timer::new(&clock, &context, period_ns).unwrap();
let elapsed = period_ns - dut.time_until_next_call().unwrap();
assert!(elapsed < tolerance , "elapsed before reset: {}", elapsed);

Expand Down Expand Up @@ -337,8 +357,35 @@ mod tests {
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
let foo = Arc::new(Mutex::new(0i64));
let foo_callback = foo.clone();
let dut = Timer::with_callback(&clock, &context, period_ns, Some(Box::new(move |x| *foo_callback.lock().unwrap() = x ))).unwrap();
let dut = Timer::new_with_callback(&clock, &context, period_ns, Some(Box::new(move |x| *foo_callback.lock().unwrap() = x ))).unwrap();
dut.callback.unwrap()(123);
assert_eq!(*foo.lock().unwrap(), 123);
}

#[test]
fn test_execute_when_is_not_ready() {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
let foo = Arc::new(Mutex::new(0i64));
let foo_callback = foo.clone();
let dut = Timer::new_with_callback(&clock, &context, period_ns, Some(Box::new(move |x| *foo_callback.lock().unwrap() = x ))).unwrap();
assert!(dut.execute().is_ok());
assert_eq!(*foo.lock().unwrap(), 0i64);
}

#[test]
fn test_execute_when_is_ready() {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
let foo = Arc::new(Mutex::new(0i64));
let foo_callback = foo.clone();
let dut = Timer::new_with_callback(&clock, &context, period_ns, Some(Box::new(move |x| *foo_callback.lock().unwrap() = x ))).unwrap();
thread::sleep(time::Duration::from_micros(1500));
assert!(dut.execute().is_ok());
let x = *foo.lock().unwrap();
assert!(x > 1500000i64);
assert!(x < 1600000i64);
}
}
72 changes: 71 additions & 1 deletion rclrs/src/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{sync::Arc, time::Duration, vec::Vec};
use crate::{
error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult},
rcl_bindings::*,
ClientBase, Context, ContextHandle, Node, ServiceBase, SubscriptionBase,
ClientBase, Context, ContextHandle, Node, ServiceBase, SubscriptionBase, Timer
};

mod exclusivity_guard;
Expand Down Expand Up @@ -51,6 +51,7 @@ pub struct WaitSet {
guard_conditions: Vec<ExclusivityGuard<Arc<GuardCondition>>>,
services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
handle: WaitSetHandle,
timers: Vec<ExclusivityGuard<Arc<Timer>>>,
}

/// A list of entities that are ready, returned by [`WaitSet::wait`].
Expand All @@ -63,6 +64,8 @@ pub struct ReadyEntities {
pub guard_conditions: Vec<Arc<GuardCondition>>,
/// A list of services that have potentially received requests.
pub services: Vec<Arc<dyn ServiceBase>>,
/// TODO
pub timers: Vec<Arc<Timer>>,
}

impl Drop for rcl_wait_set_t {
Expand Down Expand Up @@ -127,6 +130,7 @@ impl WaitSet {
rcl_wait_set,
context_handle: Arc::clone(&context.handle),
},
timers: Vec::new(),
})
}

Expand Down Expand Up @@ -178,6 +182,7 @@ impl WaitSet {
self.guard_conditions.clear();
self.clients.clear();
self.services.clear();
self.timers.clear();
// This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
// valid, which it always is in our case. Hence, only debug_assert instead of returning
// Result.
Expand Down Expand Up @@ -311,6 +316,27 @@ impl WaitSet {
Ok(())
}

/// TBD
pub fn add_timer(&mut self, timer: Arc<Timer>) -> Result<(), RclrsError> {
let exclusive_timer = ExclusivityGuard::new(
Arc::clone(&timer),
Arc::clone(&timer.in_use_by_wait_set),
)?;
unsafe {
// SAFETY: I'm not sure if it's required, but the timer pointer will remain valid
// for as long as the wait set exists, because it's stored in self.timers.
// Passing in a null pointer for the third argument is explicitly allowed.
rcl_wait_set_add_timer(
&mut self.handle.rcl_wait_set,
&* (*(*timer).rcl_timer).lock().unwrap() as *const _, // TODO :)
core::ptr::null_mut(),
)
}
.ok()?;
self.timers.push(exclusive_timer);
Ok(())
}

/// Blocks until the wait set is ready, or until the timeout has been exceeded.
///
/// If the timeout is `None` then this function will block indefinitely until
Expand Down Expand Up @@ -365,6 +391,7 @@ impl WaitSet {
clients: Vec::new(),
guard_conditions: Vec::new(),
services: Vec::new(),
timers: Vec::new(),
};
for (i, subscription) in self.subscriptions.iter().enumerate() {
// SAFETY: The `subscriptions` entry is an array of pointers, and this dereferencing is
Expand Down Expand Up @@ -409,13 +436,24 @@ impl WaitSet {
ready_entities.services.push(Arc::clone(&service.waitable));
}
}

for (i, timer) in self.timers.iter().enumerate() {
// SAFETY: The `timers` entry is an array of pointers, and this dereferencing is
// equivalent to
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
let wait_set_entry = unsafe { *self.handle.rcl_wait_set.timers.add(i) };
if !wait_set_entry.is_null() {
ready_entities.timers.push(Arc::clone(&timer.waitable));
}
}
Ok(ready_entities)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{clock::Clock};

#[test]
fn traits() {
Expand All @@ -440,4 +478,36 @@ mod tests {

Ok(())
}

#[test]
fn timer_in_wait_not_set_readies() -> Result<(), RclrsError> {
let context = Context::new([])?;
let clock = Clock::steady();
let period: i64 = 1e6 as i64; // 1 milliseconds.
let timer = Arc::new(Timer::new(&clock, &context, period)?);

let mut wait_set = WaitSet::new(0, 0, 1, 0, 0, 0, &context)?;
wait_set.add_timer(timer.clone())?;

let readies = wait_set.wait(Some(std::time::Duration::from_micros(0)))?;
assert!(!readies.timers.contains(&timer));

Ok(())
}

#[test]
fn timer_in_wait_set_readies() -> Result<(), RclrsError> {
let context = Context::new([])?;
let clock = Clock::steady();
let period: i64 = 1e6 as i64; // 1 milliseconds.
let timer = Arc::new(Timer::new(&clock, &context, period)?);

let mut wait_set = WaitSet::new(0, 0, 1, 0, 0, 0, &context)?;
wait_set.add_timer(timer.clone())?;

let readies = wait_set.wait(Some(std::time::Duration::from_micros(1500)))?;
assert!(readies.timers.contains(&timer));

Ok(())
}
}