Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 201 additions & 0 deletions rclrs/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,4 +667,205 @@ mod tests {
fn test_function_macro() {
assert_eq!(function!(), "rclrs::logging::tests::test_function_macro");
}

#[test]
fn test_rosout_publishing_default() -> Result<(), RclrsError> {
use crate::{
rcl_bindings::rcl_logging_rosout_enabled, vendor::rcl_interfaces::msg::rmw::Log,
};
use std::sync::{Arc, Mutex};

let namespace = format!("/test_rosout_publishing_default_{}", line!());
let mut executor = Context::default().create_basic_executor();
let node = executor
.create_node("rosout_default_node".namespace(&namespace))
.unwrap();

// Check if rosout is enabled at the rcl level
// SAFETY: This is a simple query function with no preconditions
let rosout_enabled = unsafe { rcl_logging_rosout_enabled() };
assert!(
rosout_enabled,
"rcl_logging_rosout should be enabled for this test"
);

// Verify rosout publisher exists immediately after node creation
// (when enable_rosout is true, which is the default)
let topics = node
.get_publisher_names_and_types_by_node("rosout_default_node", &namespace)
.unwrap();
assert!(
topics.contains_key("/rosout"),
"Node should have /rosout publisher immediately when enable_rosout is true"
);

let received_logs: Arc<Mutex<Vec<Log>>> = Arc::new(Mutex::new(Vec::new()));
let inner_received = Arc::clone(&received_logs);

// Subscribe to /rosout topic with QoS matching rosout publisher
// rosout uses: reliable, transient_local, keep_last(1000)
// Filter by logger name to avoid interference from parallel tests
let _subscription = node.create_subscription::<Log, _>(
"/rosout".reliable().transient_local().keep_all(),
move |msg: Log| {
// Only count messages from our node to avoid interference from parallel tests
if msg.name.to_string().contains("rosout_default_node") {
inner_received.lock().unwrap().push(msg);
}
},
)?;

// Wait for /rosout subscription to be discovered by the publisher
spin_until_condition(
&mut executor,
|| node.count_subscriptions("/rosout").unwrap_or(0) > 0,
Duration::from_millis(500),
);

// Log messages at all severity levels (except debug) with unique identifiers
let test_id = line!();
let info_msg = format!("Info rosout default test {}", test_id);
let warn_msg = format!("Warn rosout default test {}", test_id);
let error_msg = format!("Error rosout default test {}", test_id);
let fatal_msg = format!("Fatal rosout default test {}", test_id);

log!(node.info(), "{}", info_msg);
log!(node.warn(), "{}", warn_msg);
log!(node.error(), "{}", error_msg);
log!(node.fatal(), "{}", fatal_msg);

// Spin until all 4 messages are received (INFO, WARN, ERROR, FATAL)
spin_until_condition(
&mut executor,
|| received_logs.lock().unwrap().len() >= 4,
Duration::from_secs(5),
);

// Verify all messages were received with correct severity levels
let logs = received_logs.lock().unwrap();

let find_log =
|msg: &str| -> Option<&Log> { logs.iter().find(|l| l.msg.to_string() == msg) };

// Verify INFO message
let info_log = find_log(&info_msg);
assert!(
info_log.is_some(),
"Info message not found on /rosout. Received {} messages: {:?}",
logs.len(),
logs.iter().map(|l| l.msg.to_string()).collect::<Vec<_>>()
);
assert_eq!(info_log.unwrap().level, Log::INFO);

// Verify WARN message
let warn_log = find_log(&warn_msg);
assert!(warn_log.is_some(), "Warn message not found on /rosout");
assert_eq!(warn_log.unwrap().level, Log::WARN);

// Verify ERROR message
let error_log = find_log(&error_msg);
assert!(error_log.is_some(), "Error message not found on /rosout");
assert_eq!(error_log.unwrap().level, Log::ERROR);

// Verify FATAL message
let fatal_log = find_log(&fatal_msg);
assert!(fatal_log.is_some(), "Fatal message not found on /rosout");
assert_eq!(fatal_log.unwrap().level, Log::FATAL);

// Verify logger name is correct for all messages
for log in logs.iter() {
assert!(
log.name.to_string().contains("rosout_default_node"),
"Logger name '{}' should contain 'rosout_default_node'",
log.name
);
}

// Verify rosout publisher still exists after logging
let topics = node
.get_publisher_names_and_types_by_node("rosout_default_node", &namespace)
.unwrap();
assert!(
topics.contains_key("/rosout"),
"Node should still have /rosout publisher after logging"
);

Ok(())
}

#[test]
fn test_rosout_disabled() -> Result<(), RclrsError> {
use crate::vendor::rcl_interfaces::msg::rmw::Log;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};

let namespace = format!("/test_rosout_disabled_{}", line!());
let mut executor = Context::default().create_basic_executor();

// Create node with rosout DISABLED
let node_disabled = executor
.create_node(
"rosout_disabled_node"
.namespace(&namespace)
.enable_rosout(false),
)
.unwrap();

// Create another node with rosout ENABLED to subscribe
let node_subscriber = executor
.create_node("rosout_subscriber_node".namespace(&namespace))
.unwrap();

let test_id = line!();
let disabled_msg = format!("This should NOT appear {}", test_id);
let disabled_msg_clone = disabled_msg.clone();

let received_from_disabled = Arc::new(AtomicBool::new(false));
let inner_received = Arc::clone(&received_from_disabled);

// Subscribe to /rosout topic with QoS matching rosout publisher
let _subscription = node_subscriber.create_subscription::<Log, _>(
"/rosout".reliable().transient_local().keep_all(),
move |msg: Log| {
if msg.msg.to_string() == disabled_msg_clone {
inner_received.store(true, Ordering::Release);
}
},
)?;

// Log a message from the subscriber node to ensure rosout publisher exists
log!(node_subscriber.info(), "Subscriber node rosout init");

// Wait for discovery between subscriber's publisher and our subscription
spin_until_condition(
&mut executor,
|| node_subscriber.count_subscriptions("/rosout").unwrap_or(0) > 0,
Duration::from_millis(500),
);

// Log a message from the node with rosout disabled
log!(node_disabled.info(), "{}", disabled_msg);

// Spin for a reasonable time to ensure message would have been received if published
spin_until_condition(&mut executor, || false, Duration::from_millis(500));

// Verify the message was NOT received
assert!(
!received_from_disabled.load(Ordering::Acquire),
"Message from node with rosout disabled should NOT appear on /rosout"
);

// Verify the disabled node does not have a /rosout publisher
let topics = node_disabled
.get_publisher_names_and_types_by_node("rosout_disabled_node", &namespace)
.unwrap();
assert!(
!topics.contains_key("/rosout"),
"Node with rosout disabled should not have /rosout publisher"
);

Ok(())
}
}
18 changes: 18 additions & 0 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ pub(crate) struct NodeHandle {
/// We may be able to restructure this in the future when we no longer need
/// to support Humble.
pub(crate) initialized: AtomicBool,
/// Tracks whether the rosout publisher was initialized for this node.
/// If true, we need to call rcl_logging_rosout_fini_publisher_for_node
/// when dropping the NodeHandle.
#[cfg(not(ros_distro = "humble"))]
pub(crate) rosout_initialized: AtomicBool,
}

impl Drop for NodeHandle {
Expand All @@ -150,6 +155,19 @@ impl Drop for NodeHandle {
let mut rcl_node = self.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();

// SAFETY: The entity lifecycle mutex is locked to protect against the risk of
// global variables in the rmw implementation being unsafely modified during cleanup.
// The rosout publisher must be finalized before the node is finalized.
// Note: On Humble, rosout is managed automatically by rcl, so we only need to
// explicitly finalize it on newer distros.
#[cfg(not(ros_distro = "humble"))]
if self
.rosout_initialized
.load(std::sync::atomic::Ordering::Acquire)
{
unsafe { rcl_logging_rosout_fini_publisher_for_node(&mut *rcl_node) };
}

// SAFETY: The entity lifecycle mutex is locked to protect against the risk of
// global variables in the rmw implementation being unsafely modified during cleanup.
unsafe { rcl_node_fini(&mut *rcl_node) };
Expand Down
21 changes: 6 additions & 15 deletions rclrs/src/node/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,21 +493,12 @@ mod tests {
let node = executor.create_node(node_name).unwrap();

let check_rosout = |topics: HashMap<String, Vec<String>>| {
// rosout shows up in humble and iron, even if the graph is empty
#[cfg(any(ros_distro = "humble"))]
{
assert_eq!(topics.len(), 1);
assert_eq!(
topics.get("/rosout").unwrap().first().unwrap(),
"rcl_interfaces/msg/Log"
);
}

// rosout does not automatically show up in jazzy when the graph is empty
#[cfg(any(ros_distro = "jazzy", ros_distro = "rolling"))]
{
assert_eq!(topics.len(), 0);
}
// rosout shows up when a node is created (on all distros after enabling rosout)
assert_eq!(topics.len(), 1);
assert_eq!(
topics.get("/rosout").unwrap().first().unwrap(),
"rcl_interfaces/msg/Log"
);
};

let names_and_topics = node
Expand Down
29 changes: 26 additions & 3 deletions rclrs/src/node/node_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ pub trait IntoNodeOptions<'a>: Sized {
/// Enables or disables logging to rosout.
///
/// When enabled, log messages are published to the `/rosout` topic in addition to
/// standard output.
///
/// This option is currently unused in `rclrs`.
/// standard output. This is enabled by default.
fn enable_rosout(self, enable: bool) -> NodeOptions<'a> {
let mut options = self.into_node_options();
options.enable_rosout = enable;
Expand Down Expand Up @@ -308,6 +306,8 @@ impl<'a> NodeOptions<'a> {
rcl_node: Mutex::new(unsafe { rcl_get_zero_initialized_node() }),
context_handle: Arc::clone(&commands.context().handle),
initialized: AtomicBool::new(false),
#[cfg(not(ros_distro = "humble"))]
rosout_initialized: AtomicBool::new(false),
});

unsafe {
Expand All @@ -332,6 +332,29 @@ impl<'a> NodeOptions<'a> {
.initialized
.store(true, std::sync::atomic::Ordering::Release);

// Initialize the rosout publisher if rosout logging is enabled.
// This is required for log messages to be published to the /rosout topic.
// Note: On Humble, the rosout publisher is initialized automatically by rcl,
// so we only need to explicitly initialize it on newer distros.
#[cfg(not(ros_distro = "humble"))]
if self.enable_rosout {
// SAFETY: rcl_logging_rosout_enabled checks if rosout logging is globally enabled.
if unsafe { rcl_logging_rosout_enabled() } {
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
// SAFETY: The node has been successfully initialized and the
// entity lifecycle mutex is locked.
unsafe {
rcl_logging_rosout_init_publisher_for_node(
&mut *handle.rcl_node.lock().unwrap(),
)
}
.ok()?;
handle
.rosout_initialized
.store(true, std::sync::atomic::Ordering::Release);
}
}

let parameter = {
let rcl_node = handle.rcl_node.lock().unwrap();
ParameterInterface::new(
Expand Down
23 changes: 23 additions & 0 deletions rclrs/src/test_helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,26 @@ pub(crate) use self::graph_helpers::*;

pub(crate) fn assert_send<T: Send>() {}
pub(crate) fn assert_sync<T: Sync>() {}

use crate::{Executor, SpinOptions};
use std::time::{Duration, Instant};

/// Spins the executor until a condition returns true or timeout is reached.
/// Returns true if condition was met, false if timed out.
pub(crate) fn spin_until_condition<F>(
executor: &mut Executor,
mut condition: F,
timeout: Duration,
) -> bool
where
F: FnMut() -> bool,
{
let start = Instant::now();
while !condition() {
if start.elapsed() >= timeout {
return false;
}
executor.spin(SpinOptions::spin_once().timeout(Duration::from_millis(10)));
}
true
}
Loading