Skip to content

Refactor to remove SubscriptionHandle/ClientHandle/ServiceHandle #208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsErro
&ctx,
)?;

for live_subscription in &live_subscriptions {
wait_set.add_subscription(live_subscription.clone())?;
Copy link
Collaborator

@esteve esteve Jul 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the Waitables are now typed, we can keep the old style (Waitset::add_X(X)). IMHO, it reads more logically that we add a Subscription to a Waitset. I'd agree more with the new style if we would have an arbitrary set of entities to be added to a Waitset (the responsibility of adding an entity should therefore be in the Waitable trait), but in this case, we know we won't have anything else other than Subscriptions, Clients, Services and Timers, so we can just have methods for each entity in Waitset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we need to keep the actual logic in Waitable::add_to_wait_set (see PR description for why), but I added back the old functions as syntactic sugar.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've read the description again, but I'm afraid I'm not following, sorry. Is it because you don't want to give pub(crate) visibility to rcl_wait_set in the Waitset? Is there a reason for that? To me Waitset::add_subscription seems the natural "translattion" for rcl_wait_set_add_subscription, so I would expect the logic for adding a subscription to be in Waitset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, no, I'm fine with pub(crate) visibility for rcl types. But the issue is that WaitSet::add_subscription() needs an rcl_subscription_t.

Prior to this PR, the situation is as follows: WaitSet::add_subscription() takes the subscription in the form of a dyn SubscriptionBase, and therefore that trait must be able to provide the rcl_subscription_t. We don't want to have the rcl_subscription_t in the public API, so SubscriptionBase returned the rcl_subscription_t in an opaque wrapper type, the SubscriptionHandle.

The main goal of this PR is to remove the SubscriptionHandle, since even though we now don't have the rcl_subscription_t in the public API, we now have that SubscriptionHandle in the public API which is also "internal" and has no useful functionality for the user. Fewer public types are better, as I'm sure you agree. It does that by changing the handle() method in SubscriptionBase to not return the rcl_subscription_t handle, but just call the function that needs it directly, in this case rcl_wait_set_add_subscription(). (To have a more fitting name, that method was then renamed to add_to_wait_set(), and the trait generalized into Waitable.)

So, I hope that makes sense – in Rust you cannot have a pub(crate) method on a trait, so all trait functions are public, so the way that the SubscriptionBase trait gives the rcl_subscription_t to the WaitSet is also public, unless we use the design in this PR.

We could maybe make the organization feel more "natural" like you say by moving the impls of Waitable into wait.rs. That way, the only place we deal with rcl_wait_set_* functions is in the wait module.

A third design would be to make WaitSet::add_subscription() take an actual Subscription<T>. Then we can just access the private parts, i.e. rcl_subscription_t. This would mean the add_to_wait_set() function is removed. It has the drawback of causing a bit more monomorphization, and being more restrictive on user code, since they'll not be able to e.g. downcast several subscriptions into a SubscriptionBase (or SubscriptionWaitable, if you're fine with that) and store them into a Vec, before adding them to a wait set.

Copy link
Contributor Author

@nnmm nnmm Jul 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A fourth way is to keep Handle structs, but mark them as #[doc(hidden)]. They will still be a bit annoying to rclrs developers though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go with the third option, we know that Waitsets will only deal with Subscriptions, Clients, Services and Timers, so I think it's fine to be that restrictive.

Actually, sorry I only realize this now, but I don't think option 3 would work. The node needs to store the subscriptions (and clients and services), and it must do so by storing trait objects, not the fully-typed Subscription<T>, or T would need to become a type parameter of Node itself, which is impossible since there may be many subscriptions with different Ts. And if it doesn't have the concrete type anymore, it cannot call a WaitSet::add_subscription() that takes the concrete Subscription<T> type.

So my opinion is that the design in this PR is the best one I can think of. It The way things are currently done + #[doc(hidden)] is also acceptable – but if you don't like the design in this PR, I would like to learn more about why.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if you don't like the design in this PR, I would like to learn more about why.

Not that I actively dislike it, but I find it unnecessarily complicated. The responsibility of adding a subscription to a waitset should belong in a waitset IMHO, as it's the one that's storing the subscription's handle. But with this PR, in order to allow the entities to be added to a waitset, the waitset handle needs to be made public, which does not seem to me necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you mean the rcl_wait_set_t by "waitset handle", that's not public. The trait uses an rclrs::WaitSet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we've used that nomenclature for a while, so yeah, by "waitset handle", I mean rcl_wait_set_t. It is public in the sense that a Waitable needs access to it so it can call rcl_wait_set_add_X(). I prefer that we keep the handle structures (i.e. rcl_X_t) encapsulated in Handle structs and move them to a separate module (or declare them as [doc(hidden)]) so that developers know that these are internal.

IMHO the design in this PR would be akin to having the logic of adding an item to a std::vec inside the item that is being added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then let's leave this PR aside for a while and let me rework #224 to work with SubscriptionHandle. Then we can release and later worry about removing SubscriptionHandle.

for live_subscription in live_subscriptions {
wait_set.add_subscription(live_subscription)?;
}

for live_client in &live_clients {
wait_set.add_client(live_client.clone())?;
for live_client in live_clients {
wait_set.add_client(live_client)?;
}

for live_service in &live_services {
wait_set.add_service(live_service.clone())?;
for live_service in live_services {
wait_set.add_service(live_service)?;
}

let ready_entities = wait_set.wait(timeout)?;
Expand Down
18 changes: 9 additions & 9 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ unsafe impl Send for rcl_node_t {}
pub struct Node {
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
pub(crate) rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
pub(crate) clients: Vec<Weak<dyn ClientBase>>,
pub(crate) services: Vec<Weak<dyn ServiceBase>>,
pub(crate) subscriptions: Vec<Weak<dyn SubscriptionBase>>,
pub(crate) clients: Vec<Weak<dyn ClientWaitable>>,
pub(crate) services: Vec<Weak<dyn ServiceWaitable>>,
pub(crate) subscriptions: Vec<Weak<dyn SubscriptionWaitable>>,
_parameter_map: ParameterOverrideMap,
}

Expand Down Expand Up @@ -193,7 +193,7 @@ impl Node {
{
let client = Arc::new(crate::node::client::Client::<T>::new(self, topic)?);
self.clients
.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
.push(Arc::downgrade(&client) as Weak<dyn ClientWaitable>);
Ok(client)
}

Expand Down Expand Up @@ -229,7 +229,7 @@ impl Node {
self, topic, callback,
)?);
self.services
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
.push(Arc::downgrade(&service) as Weak<dyn ServiceWaitable>);
Ok(service)
}

Expand All @@ -249,23 +249,23 @@ impl Node {
{
let subscription = Arc::new(Subscription::<T>::new(self, topic, qos, callback)?);
self.subscriptions
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionWaitable>);
Ok(subscription)
}

/// Returns the subscriptions that have not been dropped yet.
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn SubscriptionBase>> {
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn SubscriptionWaitable>> {
self.subscriptions
.iter()
.filter_map(Weak::upgrade)
.collect()
}

pub(crate) fn live_clients(&self) -> Vec<Arc<dyn ClientBase>> {
pub(crate) fn live_clients(&self) -> Vec<Arc<dyn ClientWaitable>> {
self.clients.iter().filter_map(Weak::upgrade).collect()
}

pub(crate) fn live_services(&self) -> Vec<Arc<dyn ServiceBase>> {
pub(crate) fn live_services(&self) -> Vec<Arc<dyn ServiceWaitable>> {
self.services.iter().filter_map(Weak::upgrade).collect()
}

Expand Down
155 changes: 68 additions & 87 deletions rclrs/src/node/client.rs
Original file line number Diff line number Diff line change
@@ -1,77 +1,94 @@
use crate::node::client::oneshot::Canceled;
use futures::channel::oneshot;
use std::boxed::Box;
use std::collections::HashMap;
use std::ffi::CString;
use std::sync::Arc;

use crate::error::{RclReturnCode, ToResult};
use crate::MessageCow;
use crate::Node;
use crate::{rcl_bindings::*, RclrsError};
use crate::rcl_bindings::*;
use crate::{MessageCow, Node, RclReturnCode, RclrsError, ToResult, WaitSet, Waitable};

use parking_lot::{Mutex, MutexGuard};
use parking_lot::Mutex;
use rosidl_runtime_rs::Message;

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for rcl_client_t {}

/// Internal struct used by clients.
pub struct ClientHandle {
type RequestValue<Response> = Box<dyn FnOnce(Response) + 'static + Send>;

type RequestId = i64;

/// Main class responsible for sending requests to a ROS service.
pub struct Client<T>
where
T: rosidl_runtime_rs::Service,
{
rcl_client_mtx: Mutex<rcl_client_t>,
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
requests: Mutex<HashMap<RequestId, RequestValue<T::Response>>>,
futures: Arc<Mutex<HashMap<RequestId, oneshot::Sender<T::Response>>>>,
}

impl ClientHandle {
pub(crate) fn lock(&self) -> MutexGuard<rcl_client_t> {
self.rcl_client_mtx.lock()
}
}

impl Drop for ClientHandle {
impl<T> Drop for Client<T>
where
T: rosidl_runtime_rs::Service,
{
fn drop(&mut self) {
let handle = self.rcl_client_mtx.get_mut();
let rcl_node_mtx = &mut *self.rcl_node_mtx.lock();
let rcl_client = self.rcl_client_mtx.get_mut();
let rcl_node = &mut *self.rcl_node_mtx.lock();
// SAFETY: No preconditions for this function
unsafe {
rcl_client_fini(handle, rcl_node_mtx);
rcl_client_fini(rcl_client, rcl_node);
}
}
}

impl From<Canceled> for RclrsError {
fn from(_: Canceled) -> Self {
RclrsError::RclError {
code: RclReturnCode::Error,
msg: None,
}
impl<T> Waitable for Client<T>
where
T: rosidl_runtime_rs::Service,
{
unsafe fn add_to_wait_set(self: Arc<Self>, wait_set: &mut WaitSet) -> Result<(), RclrsError> {
// SAFETY: I'm not sure if it's required, but the client pointer will remain valid
// for as long as the wait set exists, because it's stored in self.clients.
// Passing in a null pointer for the third argument is explicitly allowed.
rcl_wait_set_add_client(
&mut wait_set.rcl_wait_set,
&*self.rcl_client_mtx.lock(),
std::ptr::null_mut(),
)
.ok()?;
wait_set.clients.push(self);
Ok(())
}
}

/// Trait to be implemented by concrete Client structs.
///
/// See [`Client<T>`] for an example.
pub trait ClientBase: Send + Sync {
/// Internal function to get a reference to the `rcl` handle.
fn handle(&self) -> &ClientHandle;
/// Tries to take a new response and run the callback or future with it.
fn execute(&self) -> Result<(), RclrsError>;
fn execute(&self) -> Result<(), RclrsError> {
let (res, req_id) = match self.take_response() {
Ok((res, req_id)) => (res, req_id),
Err(RclrsError::RclError {
code: RclReturnCode::ClientTakeFailed,
..
}) => {
// Spurious wakeup – this may happen even when a waitset indicated that this
// client was ready, so it shouldn't be an error.
return Ok(());
}
Err(e) => return Err(e),
};
let requests = &mut *self.requests.lock();
let futures = &mut *self.futures.lock();
if let Some(callback) = requests.remove(&req_id.sequence_number) {
callback(res);
} else if let Some(future) = futures.remove(&req_id.sequence_number) {
let _ = future.send(res);
}
Ok(())
}
}

type RequestValue<Response> = Box<dyn FnOnce(Response) + 'static + Send>;

type RequestId = i64;
/// A marker trait to distinguish `Client` waitables from other [`Waitable`]s.
pub trait ClientWaitable: Waitable {}

/// Main class responsible for sending requests to a ROS service.
pub struct Client<T>
where
T: rosidl_runtime_rs::Service,
{
pub(crate) handle: Arc<ClientHandle>,
requests: Mutex<HashMap<RequestId, RequestValue<T::Response>>>,
futures: Arc<Mutex<HashMap<RequestId, oneshot::Sender<T::Response>>>>,
}
impl<T> ClientWaitable for Client<T> where T: rosidl_runtime_rs::Service {}

impl<T> Client<T>
where
Expand Down Expand Up @@ -110,13 +127,9 @@ where
.ok()?;
}

let handle = Arc::new(ClientHandle {
rcl_client_mtx: Mutex::new(rcl_client),
rcl_node_mtx: node.rcl_node_mtx.clone(),
});

Ok(Self {
handle,
rcl_client_mtx: Mutex::new(rcl_client),
rcl_node_mtx: Arc::clone(&node.rcl_node_mtx),
requests: Mutex::new(HashMap::new()),
futures: Arc::new(Mutex::new(
HashMap::<RequestId, oneshot::Sender<T::Response>>::new(),
Expand Down Expand Up @@ -149,7 +162,7 @@ where
unsafe {
// SAFETY: The request type is guaranteed to match the client type by the type system.
rcl_send_request(
&*self.handle.lock() as *const _,
&*self.rcl_client_mtx.lock() as *const _,
rmw_message.as_ref() as *const <T::Request as Message>::RmwMsg as *mut _,
&mut sequence_number,
)
Expand Down Expand Up @@ -184,7 +197,7 @@ where
unsafe {
// SAFETY: The request type is guaranteed to match the client type by the type system.
rcl_send_request(
&*self.handle.lock() as *const _,
&*self.rcl_client_mtx.lock() as *const _,
rmw_message.as_ref() as *const <T::Request as Message>::RmwMsg as *mut _,
&mut sequence_number,
)
Expand Down Expand Up @@ -228,11 +241,11 @@ where
type RmwMsg<T> =
<<T as rosidl_runtime_rs::Service>::Response as rosidl_runtime_rs::Message>::RmwMsg;
let mut response_out = RmwMsg::<T>::default();
let handle = &*self.handle.lock();
let rcl_client = &*self.rcl_client_mtx.lock();
unsafe {
// SAFETY: The three pointers are valid/initialized
rcl_take_response(
handle,
rcl_client,
&mut request_id_out,
&mut response_out as *mut RmwMsg<T> as *mut _,
)
Expand All @@ -241,35 +254,3 @@ where
Ok((T::Response::from_rmw_message(response_out), request_id_out))
}
}

impl<T> ClientBase for Client<T>
where
T: rosidl_runtime_rs::Service,
{
fn handle(&self) -> &ClientHandle {
&self.handle
}

fn execute(&self) -> Result<(), RclrsError> {
let (res, req_id) = match self.take_response() {
Ok((res, req_id)) => (res, req_id),
Err(RclrsError::RclError {
code: RclReturnCode::ClientTakeFailed,
..
}) => {
// Spurious wakeup – this may happen even when a waitset indicated that this
// client was ready, so it shouldn't be an error.
return Ok(());
}
Err(e) => return Err(e),
};
let requests = &mut *self.requests.lock();
let futures = &mut *self.futures.lock();
if let Some(callback) = requests.remove(&req_id.sequence_number) {
callback(res);
} else if let Some(future) = futures.remove(&req_id.sequence_number) {
let _ = future.send(res);
}
Ok(())
}
}
Loading