Skip to content

Commit 84057df

Browse files
committed
Added support for clients and services
1 parent 7daf8cc commit 84057df

File tree

21 files changed

+908
-17
lines changed

21 files changed

+908
-17
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
[package]
2+
name = "examples_rclrs_minimal_client_service"
3+
version = "0.2.0"
4+
authors = ["Esteve Fernandez <esteve@apache.org>"]
5+
edition = "2021"
6+
7+
[[bin]]
8+
name = "minimal_client"
9+
path = "src/minimal_client.rs"
10+
11+
[[bin]]
12+
name = "minimal_client_async"
13+
path = "src/minimal_client_async.rs"
14+
15+
[[bin]]
16+
name = "minimal_service"
17+
path = "src/minimal_service.rs"
18+
19+
[dependencies]
20+
anyhow = {version = "1", features = ["backtrace"]}
21+
22+
[dependencies.rclrs]
23+
version = "*"
24+
25+
[dependencies.rosidl_runtime_rs]
26+
version = "*"
27+
28+
[dependencies.std_msgs]
29+
version = "*"
30+
31+
[dependencies.example_interfaces]
32+
version = "*"
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?xml version="1.0"?>
2+
<?xml-model
3+
href="http://download.ros.org/schema/package_format3.xsd"
4+
schematypens="http://www.w3.org/2001/XMLSchema"?>
5+
<package format="3">
6+
<name>examples_rclrs_minimal_client_service</name>
7+
<version>0.2.0</version>
8+
<description>Package containing an example of the client-service mechanism in rclrs.</description>
9+
<maintainer email="esteve@apache.org">Esteve Fernandez</maintainer>
10+
<license>Apache License 2.0</license>
11+
12+
<build_depend>example_interfaces</build_depend>
13+
<build_depend>rclrs</build_depend>
14+
<build_depend>rosidl_runtime_rs</build_depend>
15+
<build_depend>std_msgs</build_depend>
16+
17+
<exec_depend>example_interfaces</exec_depend>
18+
<exec_depend>rclrs</exec_depend>
19+
<exec_depend>rosidl_runtime_rs</exec_depend>
20+
<exec_depend>std_msgs</exec_depend>
21+
22+
<export>
23+
<build_type>ament_cargo</build_type>
24+
</export>
25+
</package>
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn main() -> Result<(), Error> {
5+
let context = rclrs::Context::new(env::args()).unwrap();
6+
7+
let mut node = context.create_node("minimal_client")?;
8+
9+
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
10+
11+
let mut request = example_interfaces::srv::AddTwoInts_Request::default();
12+
request.a = 41;
13+
request.b = 1;
14+
15+
println!("Starting client");
16+
17+
std::thread::sleep(std::time::Duration::from_millis(500));
18+
19+
client.async_send_request_with_callback(
20+
&request,
21+
move |response: &example_interfaces::srv::AddTwoInts_Response| {
22+
println!(
23+
"Result of {} + {} is: {}",
24+
request.a, request.b, response.sum
25+
);
26+
},
27+
)?;
28+
29+
std::thread::sleep(std::time::Duration::from_millis(500));
30+
31+
println!("Waiting for response");
32+
rclrs::spin(&node).map_err(|err| err.into())
33+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn main() -> Result<(), Error> {
5+
let context = rclrs::Context::new(env::args()).unwrap();
6+
7+
let mut node = context.create_node("minimal_client")?;
8+
9+
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
10+
11+
let mut request = example_interfaces::srv::AddTwoInts_Request::default();
12+
request.a = 41;
13+
request.b = 1;
14+
15+
println!("Starting client");
16+
17+
std::thread::sleep(std::time::Duration::from_millis(500));
18+
19+
let future = client.call_async(&request)?;
20+
21+
println!("Waiting for response");
22+
let response = rclrs::spin_until_future_complete(&node, future.clone())?;
23+
24+
println!(
25+
"Result of {} + {} is: {}",
26+
request.a, request.b, response.sum
27+
);
28+
Ok(())
29+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn handle_service(
5+
_request_header: &rclrs::rmw_request_id_t,
6+
request: &example_interfaces::srv::AddTwoInts_Request,
7+
response: &mut example_interfaces::srv::AddTwoInts_Response,
8+
) {
9+
println!("request: {} + {}", request.a, request.b);
10+
response.sum = request.a + request.b;
11+
}
12+
13+
fn main() -> Result<(), Error> {
14+
let context = rclrs::Context::new(env::args()).unwrap();
15+
16+
let mut node = context.create_node("minimal_service")?;
17+
18+
let _server = node
19+
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;
20+
21+
println!("Starting server");
22+
rclrs::spin(&node).map_err(|err| err.into())
23+
}

rclrs/src/future.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/// Based on https://www.viget.com/articles/understanding-futures-in-rust-part-1/
2+
use std::future::Future;
3+
use std::marker::PhantomData;
4+
use std::pin::Pin;
5+
use std::sync::Arc;
6+
use std::task::Context;
7+
use std::task::Poll;
8+
use std::task::RawWaker;
9+
use std::task::RawWakerVTable;
10+
use std::task::Waker;
11+
12+
use parking_lot::Mutex;
13+
14+
#[derive(Default)]
15+
pub struct RclFuture<T> {
16+
value: Option<T>,
17+
}
18+
19+
impl<T: Default + Clone> RclFuture<T> {
20+
pub fn new() -> RclFuture<T> {
21+
Self { value: None }
22+
}
23+
24+
pub fn set_value(&mut self, msg: T) {
25+
self.value = Some(msg);
26+
}
27+
}
28+
29+
impl<T: Clone> Future for RclFuture<T> {
30+
type Output = T;
31+
32+
fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output> {
33+
if let Some(value) = &self.value {
34+
Poll::Ready(value.clone())
35+
} else {
36+
Poll::Pending
37+
}
38+
}
39+
}
40+
41+
#[derive(Clone)]
42+
struct RclWaker {}
43+
44+
fn rclwaker_wake(_s: &RclWaker) {}
45+
46+
fn rclwaker_wake_by_ref(_s: &RclWaker) {}
47+
48+
fn rclwaker_clone(s: &RclWaker) -> RawWaker {
49+
let arc = unsafe { Arc::from_raw(s) };
50+
std::mem::forget(arc.clone());
51+
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
52+
}
53+
54+
const VTABLE: RawWakerVTable = unsafe {
55+
RawWakerVTable::new(
56+
|s| rclwaker_clone(&*(s as *const RclWaker)),
57+
|s| rclwaker_wake(&*(s as *const RclWaker)),
58+
|s| rclwaker_wake_by_ref(&*(s as *const RclWaker)),
59+
|s| drop(Arc::from_raw(s as *const RclWaker)),
60+
)
61+
};
62+
63+
fn rclwaker_into_waker(s: *const RclWaker) -> Waker {
64+
let raw_waker = RawWaker::new(s as *const (), &VTABLE);
65+
unsafe { Waker::from_raw(raw_waker) }
66+
}
67+
68+
fn create_rcl_waker_context() {
69+
let rclwaker = Arc::new(RclWaker {});
70+
let waker = rclwaker_into_waker(Arc::into_raw(rclwaker));
71+
std::task::Context::from_waker(&waker)
72+
}

rclrs/src/lib.rs

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@
55
//!
66
//! [1]: https://github.com/ros2-rust/ros2_rust/blob/master/README.md
77
8+
use std::future::Future;
9+
use std::pin::Pin;
10+
use std::sync::Arc;
11+
use std::task::Poll;
12+
use std::time::Duration;
13+
814
mod context;
915
mod error;
1016
mod node;
@@ -20,37 +26,70 @@ pub use qos::*;
2026
pub use wait::*;
2127

2228
use rcl_bindings::rcl_context_is_valid;
23-
use std::time::Duration;
29+
30+
pub use rcl_bindings::rmw_request_id_t;
31+
32+
use parking_lot::Mutex;
2433

2534
/// Polls the node for new messages and executes the corresponding callbacks.
2635
///
2736
/// See [`WaitSet::wait`] for the meaning of the `timeout` parameter.
2837
///
2938
/// This may under some circumstances return
30-
/// [`SubscriptionTakeFailed`][1] when the wait set spuriously wakes up.
39+
/// [`SubscriptionTakeFailed`][1], [`ClientTakeFailed`][2], [`ServiceTakeFailed`][3] when the wait
40+
/// set spuriously wakes up.
3141
/// This can usually be ignored.
3242
///
3343
/// [1]: crate::SubscriberErrorCode
44+
/// [2]: crate::ClientErrorCode
45+
/// [3]: crate::ServiceErrorCode
3446
pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsError> {
3547
let live_subscriptions = node.live_subscriptions();
48+
let live_clients = node.live_clients();
49+
let live_services = node.live_services();
3650
let ctx = Context {
3751
handle: node.context.clone(),
3852
};
39-
let mut wait_set = WaitSet::new(live_subscriptions.len(), &ctx)?;
53+
let mut wait_set = WaitSet::new(
54+
live_subscriptions.len(),
55+
0,
56+
0,
57+
live_clients.len(),
58+
live_services.len(),
59+
0,
60+
&ctx,
61+
)?;
4062

4163
for live_subscription in &live_subscriptions {
4264
wait_set.add_subscription(live_subscription.clone())?;
4365
}
4466

67+
for live_client in &live_clients {
68+
wait_set.add_client(live_client.clone())?;
69+
}
70+
71+
for live_service in &live_services {
72+
wait_set.add_service(live_service.clone())?;
73+
}
74+
4575
let ready_entities = wait_set.wait(timeout)?;
76+
4677
for ready_subscription in ready_entities.subscriptions {
4778
ready_subscription.execute()?;
4879
}
4980

81+
for ready_client in ready_entities.clients {
82+
ready_client.execute()?;
83+
}
84+
85+
for ready_service in ready_entities.services {
86+
ready_service.execute()?;
87+
}
88+
5089
Ok(())
5190
}
5291

53-
/// Convenience function for calling [`spin_once`] in a loop.
92+
/// Convenience function for calling [`rclrs::spin_once`] in a loop.
5493
///
5594
/// This function additionally checks that the context is still valid.
5695
pub fn spin(node: &Node) -> Result<(), RclrsError> {
@@ -70,6 +109,28 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> {
70109
};
71110
}
72111
}
73-
74112
Ok(())
75113
}
114+
115+
pub fn spin_until_future_complete<T: Unpin + Clone>(
116+
node: &node::Node,
117+
mut future: Arc<Mutex<Box<future::RclFuture<T>>>>,
118+
) -> Result<<future::RclFuture<T> as Future>::Output, RclReturnCode> {
119+
let mut cx = future::create_rcl_waker_context();
120+
121+
loop {
122+
let context_valid = unsafe { rcl_context_is_valid(&mut *node.context.lock()) };
123+
if context_valid {
124+
if let Some(error) = spin_once(node, None).err() {
125+
match error {
126+
RclReturnCode::Timeout => continue,
127+
error => return Err(error),
128+
};
129+
};
130+
match Future::poll(Pin::new(&mut *future.lock()), &mut cx) {
131+
Poll::Ready(val) => break Ok(val),
132+
Poll::Pending => continue,
133+
};
134+
}
135+
}
136+
}

0 commit comments

Comments
 (0)