Skip to content

Return Arc from the create_node function to match other create_X functions #294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docs/writing-your-first-rclrs-node.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ struct RepublisherNode {

impl RepublisherNode {
fn new(context: &rclrs::Context) -> Result<Self, rclrs::RclrsError> {
let mut node = rclrs::Node::new(context, "republisher")?;
let node = rclrs::Node::new(context, "republisher")?;
let data = None;
let _subscription = node.create_subscription(
"in_topic",
Expand All @@ -76,7 +76,7 @@ Next, add a main function to launch it:
fn main() -> Result<(), rclrs::RclrsError> {
let context = rclrs::Context::new(std::env::args())?;
let republisher = RepublisherNode::new(&context)?;
rclrs::spin(&republisher.node)
rclrs::spin(republisher.node)
}
```

Expand Down Expand Up @@ -121,7 +121,7 @@ struct RepublisherNode {

impl RepublisherNode {
fn new(context: &rclrs::Context) -> Result<Self, rclrs::RclrsError> {
let mut node = rclrs::Node::new(context, "republisher")?;
let node = rclrs::Node::new(context, "republisher")?;
let data = Arc::new(Mutex::new(None)); // (3)
let data_cb = Arc::clone(&data);
let _subscription = {
Expand Down Expand Up @@ -190,7 +190,7 @@ fn main() -> Result<(), rclrs::RclrsError> {
republisher.republish()?;
}
});
rclrs::spin(&republisher.node)
rclrs::spin(republisher.node)
}
```

Expand All @@ -212,7 +212,7 @@ fn main() -> Result<(), rclrs::RclrsError> {
republisher_other_thread.republish()?;
}
});
rclrs::spin(&republisher.node)
rclrs::spin(republisher.node)
}
```

Expand Down
7 changes: 4 additions & 3 deletions examples/message_demo/src/message_demo.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::convert::TryInto;
use std::env;
use std::sync::Arc;

use anyhow::{Error, Result};
use rosidl_runtime_rs::{seq, BoundedSequence, Message, Sequence};
Expand Down Expand Up @@ -132,7 +133,7 @@ fn demonstrate_pubsub() -> Result<(), Error> {
println!("================== Interoperability demo ==================");
// Demonstrate interoperability between idiomatic and RMW-native message types
let context = rclrs::Context::new(env::args())?;
let mut node = rclrs::create_node(&context, "message_demo")?;
let node = rclrs::create_node(&context, "message_demo")?;

let idiomatic_publisher = node.create_publisher::<rclrs_example_msgs::msg::VariousTypes>(
"topic",
Expand All @@ -159,10 +160,10 @@ fn demonstrate_pubsub() -> Result<(), Error> {
)?;
println!("Sending idiomatic message.");
idiomatic_publisher.publish(rclrs_example_msgs::msg::VariousTypes::default())?;
rclrs::spin_once(&node, None)?;
rclrs::spin_once(Arc::clone(&node), None)?;
println!("Sending RMW-native message.");
direct_publisher.publish(rclrs_example_msgs::msg::rmw::VariousTypes::default())?;
rclrs::spin_once(&node, None)?;
rclrs::spin_once(Arc::clone(&node), None)?;

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions examples/minimal_client_service/src/minimal_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::{Error, Result};
fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = rclrs::create_node(&context, "minimal_client")?;
let node = rclrs::create_node(&context, "minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

Expand All @@ -28,5 +28,5 @@ fn main() -> Result<(), Error> {
std::thread::sleep(std::time::Duration::from_millis(500));

println!("Waiting for response");
rclrs::spin(&node).map_err(|err| err.into())
rclrs::spin(node).map_err(|err| err.into())
}
4 changes: 2 additions & 2 deletions examples/minimal_client_service/src/minimal_client_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::{Error, Result};
async fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = rclrs::create_node(&context, "minimal_client")?;
let node = rclrs::create_node(&context, "minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

Expand All @@ -20,7 +20,7 @@ async fn main() -> Result<(), Error> {

println!("Waiting for response");

let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(&node));
let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(node));

let response = future.await?;
println!(
Expand Down
4 changes: 2 additions & 2 deletions examples/minimal_client_service/src/minimal_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ fn handle_service(
fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = rclrs::create_node(&context, "minimal_service")?;
let node = rclrs::create_node(&context, "minimal_service")?;

let _server = node
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;

println!("Starting server");
rclrs::spin(&node).map_err(|err| err.into())
rclrs::spin(node).map_err(|err| err.into())
}
4 changes: 2 additions & 2 deletions examples/minimal_pub_sub/src/minimal_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::{Error, Result};
fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = rclrs::create_node(&context, "minimal_subscriber")?;
let node = rclrs::create_node(&context, "minimal_subscriber")?;

let mut num_messages: usize = 0;

Expand All @@ -19,5 +19,5 @@ fn main() -> Result<(), Error> {
},
)?;

rclrs::spin(&node).map_err(|err| err.into())
rclrs::spin(node).map_err(|err| err.into())
}
4 changes: 2 additions & 2 deletions examples/minimal_pub_sub/src/zero_copy_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::{Error, Result};
fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = rclrs::create_node(&context, "minimal_subscriber")?;
let node = rclrs::create_node(&context, "minimal_subscriber")?;

let mut num_messages: usize = 0;

Expand All @@ -19,5 +19,5 @@ fn main() -> Result<(), Error> {
},
)?;

rclrs::spin(&node).map_err(|err| err.into())
rclrs::spin(node).map_err(|err| err.into())
}
19 changes: 11 additions & 8 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod rcl_bindings;
#[cfg(feature = "dyn_msg")]
pub mod dynamic_message;

use std::sync::Arc;
use std::time::Duration;

pub use arguments::*;
Expand All @@ -49,8 +50,8 @@ pub use wait::*;
/// This can usually be ignored.
///
/// [1]: crate::RclReturnCode
pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsError> {
let wait_set = WaitSet::new_for_node(node)?;
pub fn spin_once(node: Arc<Node>, timeout: Option<Duration>) -> Result<(), RclrsError> {
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;

for ready_subscription in ready_entities.subscriptions {
Expand All @@ -71,14 +72,16 @@ pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsErro
/// Convenience function for calling [`spin_once`] in a loop.
///
/// This function additionally checks that the context is still valid.
pub fn spin(node: &Node) -> Result<(), RclrsError> {
pub fn spin(node: Arc<Node>) -> Result<(), RclrsError> {
// The context_is_valid functions exists only to abstract away ROS distro differences
// SAFETY: No preconditions for this function.
let context_is_valid =
|| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) };
let context_is_valid = {
let node = Arc::clone(&node);
move || unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) }
};

while context_is_valid() {
match spin_once(node, None) {
match spin_once(Arc::clone(&node), None) {
Ok(_)
| Err(RclrsError::RclError {
code: RclReturnCode::Timeout,
Expand All @@ -105,8 +108,8 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> {
/// assert!(node.is_ok());
/// # Ok::<(), RclrsError>(())
/// ```
pub fn create_node(context: &Context, node_name: &str) -> Result<Node, RclrsError> {
Node::builder(context, node_name).build()
pub fn create_node(context: &Context, node_name: &str) -> Result<Arc<Node>, RclrsError> {
Ok(Arc::new(Node::builder(context, node_name).build()?))
}

/// Creates a [`NodeBuilder`][1].
Expand Down
42 changes: 24 additions & 18 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ unsafe impl Send for rcl_node_t {}
pub struct Node {
pub(crate) rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
pub(crate) rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
pub(crate) clients: Vec<Weak<dyn ClientBase>>,
pub(crate) guard_conditions: Vec<Weak<GuardCondition>>,
pub(crate) services: Vec<Weak<dyn ServiceBase>>,
pub(crate) subscriptions: Vec<Weak<dyn SubscriptionBase>>,
pub(crate) clients_mtx: Mutex<Vec<Weak<dyn ClientBase>>>,
pub(crate) guard_conditions_mtx: Mutex<Vec<Weak<GuardCondition>>>,
pub(crate) services_mtx: Mutex<Vec<Weak<dyn ServiceBase>>>,
pub(crate) subscriptions_mtx: Mutex<Vec<Weak<dyn SubscriptionBase>>>,
_parameter_map: ParameterOverrideMap,
}

Expand Down Expand Up @@ -180,13 +180,12 @@ impl Node {
///
/// [1]: crate::Client
// TODO: make client's lifetime depend on node's lifetime
pub fn create_client<T>(&mut self, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
pub fn create_client<T>(&self, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
where
T: rosidl_runtime_rs::Service,
{
let client = Arc::new(Client::<T>::new(Arc::clone(&self.rcl_node_mtx), topic)?);
self.clients
.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
{ self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
Ok(client)
}

Expand All @@ -199,12 +198,12 @@ impl Node {
///
/// [1]: crate::GuardCondition
/// [2]: crate::spin_once
pub fn create_guard_condition(&mut self) -> Arc<GuardCondition> {
pub fn create_guard_condition(&self) -> Arc<GuardCondition> {
let guard_condition = Arc::new(GuardCondition::new_with_rcl_context(
&mut self.rcl_context_mtx.lock().unwrap(),
None,
));
self.guard_conditions
{ self.guard_conditions_mtx.lock().unwrap() }
.push(Arc::downgrade(&guard_condition) as Weak<GuardCondition>);
guard_condition
}
Expand All @@ -226,7 +225,7 @@ impl Node {
&mut self.rcl_context_mtx.lock().unwrap(),
Some(Box::new(callback) as Box<dyn Fn() + Send + Sync>),
));
self.guard_conditions
{ self.guard_conditions_mtx.lock().unwrap() }
.push(Arc::downgrade(&guard_condition) as Weak<GuardCondition>);
guard_condition
}
Expand All @@ -251,7 +250,7 @@ impl Node {
/// [1]: crate::Service
// TODO: make service's lifetime depend on node's lifetime
pub fn create_service<T, F>(
&mut self,
&self,
topic: &str,
callback: F,
) -> Result<Arc<Service<T>>, RclrsError>
Expand All @@ -264,7 +263,7 @@ impl Node {
topic,
callback,
)?);
self.services
{ self.services_mtx.lock().unwrap() }
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
Ok(service)
}
Expand All @@ -274,7 +273,7 @@ impl Node {
/// [1]: crate::Subscription
// TODO: make subscription's lifetime depend on node's lifetime
pub fn create_subscription<T, Args>(
&mut self,
&self,
topic: &str,
qos: QoSProfile,
callback: impl SubscriptionCallback<T, Args>,
Expand All @@ -288,32 +287,39 @@ impl Node {
qos,
callback,
)?);
self.subscriptions
{ self.subscriptions_mtx.lock() }
.unwrap()
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
Ok(subscription)
}

/// Returns the subscriptions that have not been dropped yet.
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn SubscriptionBase>> {
self.subscriptions
{ self.subscriptions_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.collect()
}

pub(crate) fn live_clients(&self) -> Vec<Arc<dyn ClientBase>> {
self.clients.iter().filter_map(Weak::upgrade).collect()
{ self.clients_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.collect()
}

pub(crate) fn live_guard_conditions(&self) -> Vec<Arc<GuardCondition>> {
self.guard_conditions
{ self.guard_conditions_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.collect()
}

pub(crate) fn live_services(&self) -> Vec<Arc<dyn ServiceBase>> {
self.services.iter().filter_map(Weak::upgrade).collect()
{ self.services_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.collect()
}

/// Returns the ROS domain ID that the node is using.
Expand Down
8 changes: 4 additions & 4 deletions rclrs/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,10 @@ impl NodeBuilder {
Ok(Node {
rcl_node_mtx,
rcl_context_mtx: self.context.clone(),
clients: vec![],
guard_conditions: vec![],
services: vec![],
subscriptions: vec![],
clients_mtx: Mutex::new(vec![]),
guard_conditions_mtx: Mutex::new(vec![]),
services_mtx: Mutex::new(vec![]),
subscriptions_mtx: Mutex::new(vec![]),
_parameter_map,
})
}
Expand Down
8 changes: 4 additions & 4 deletions rclrs_tests/src/graph_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn test_publishers() -> Result<(), RclrsError> {
#[test]
fn test_subscriptions() -> Result<(), RclrsError> {
let namespace = "/test_subscriptions_graph";
let mut graph = construct_test_graph(namespace)?;
let graph = construct_test_graph(namespace)?;

let node_2_empty_subscription = graph.node2.create_subscription::<msg::Empty, _>(
"graph_test_topic_1",
Expand Down Expand Up @@ -149,7 +149,7 @@ fn test_subscriptions() -> Result<(), RclrsError> {

#[test]
fn test_topic_names_and_types() -> Result<(), RclrsError> {
let mut graph = construct_test_graph("test_topics_graph")?;
let graph = construct_test_graph("test_topics_graph")?;

let _node_1_defaults_subscription = graph.node1.create_subscription::<msg::Defaults, _>(
"graph_test_topic_3",
Expand Down Expand Up @@ -191,7 +191,7 @@ fn test_topic_names_and_types() -> Result<(), RclrsError> {
#[test]
fn test_services() -> Result<(), RclrsError> {
let namespace = "/test_services_graph";
let mut graph = construct_test_graph(namespace)?;
let graph = construct_test_graph(namespace)?;
let check_names_and_types = |names_and_types: TopicNamesAndTypes| {
let types = names_and_types
.get("/test_services_graph/graph_test_topic_4")
Expand Down Expand Up @@ -225,7 +225,7 @@ fn test_services() -> Result<(), RclrsError> {
#[test]
fn test_clients() -> Result<(), RclrsError> {
let namespace = "/test_clients_graph";
let mut graph = construct_test_graph(namespace)?;
let graph = construct_test_graph(namespace)?;
let _node_2_empty_client = graph
.node2
.create_client::<srv::Empty>("graph_test_topic_4")?;
Expand Down