Skip to content

Add a test to check if trajectory message is received #805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example_7/reference_generator/send_trajectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ int main(int argc, char ** argv)
std::fill(last_point_msg.velocities.begin(), last_point_msg.velocities.end(), 0.0);

pub->publish(trajectory_msg);
RCLCPP_INFO(node->get_logger(), "Trajectory sent with %lu points", trajectory_msg.points.size());
while (rclcpp::ok())
{
}
Expand Down
29 changes: 28 additions & 1 deletion example_7/test/test_r6bot_controller_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import os
import pytest
import psutil
import subprocess
import time
import unittest

from ament_index_python.packages import get_package_share_directory
Expand All @@ -39,12 +42,12 @@
from launch_testing.actions import ReadyToTest

import launch_testing.markers
import rclpy
from controller_manager.test_utils import (
check_controllers_running,
check_if_js_published,
check_node_running,
)
import rclpy


# Executes the given launch file and checks if all nodes can be started
Expand Down Expand Up @@ -94,6 +97,30 @@ def test_check_if_msgs_published(self):
"/joint_states", ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]
)

def test_check_if_trajectory_published(self, proc_output):

topic = "/r6bot_controller/joint_trajectory"

command = [
"ros2",
"launch",
"ros2_control_demo_example_7",
"send_trajectory.launch.py",
]
subprocess.Popen(command) # the process won't finish
time.sleep(1)
pubs = self.node.get_publishers_info_by_topic(topic)
assert len(pubs) > 0, f"No publisher for topic '{topic}' not found!"

# message will be published only once, so WaitForTopics is not suitable here.
# let's just check if the controller received and processed the message
proc_output.assertWaitFor("Trajectory execution complete", timeout=10, stream="stderr")

for proc in psutil.process_iter():
if "send_trajectory" in proc.name():
proc.kill()
print(f"Process {proc.name()} killed")


@launch_testing.post_shutdown_test()
# These tests are run after the processes in generate_test_description() have shutdown.
Expand Down
Loading