Skip to content

Make RosTopicSubNode a StatefulActionNode and handle different reliability QoS configurations #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: humble
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rework RosTopicSubNode as a StatefulActionNode
  • Loading branch information
schornakj committed Sep 20, 2024
commit deb7dc5e45ded057e541d0e4e17c0c96418f8137
136 changes: 67 additions & 69 deletions behaviortree_ros2/include/behaviortree_ros2/bt_topic_sub_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

#pragma once

#include <behaviortree_cpp/action_node.h>
#include <behaviortree_cpp/basic_types.h>
#include <memory>
#include <string>
#include <rclcpp/executors.hpp>
#include <rclcpp/allocator/allocator_common.hpp>
#include "behaviortree_cpp/condition_node.h"
#include "behaviortree_cpp/bt_factory.h"

#include "behaviortree_ros2/ros_node_params.hpp"
#include <boost/signals2.hpp>
Expand All @@ -42,7 +42,7 @@ namespace BT
* 2. Otherwise, use the value in RosNodeParams::default_port_value
*/
template <class TopicT>
class RosTopicSubNode : public BT::ConditionNode
class RosTopicSubNode : public BT::StatefulActionNode
{
public:
// Type definitions
Expand Down Expand Up @@ -131,9 +131,13 @@ class RosTopicSubNode : public BT::ConditionNode
return providedBasicPorts({});
}

NodeStatus tick() override final;
NodeStatus onStart() override final;

/** Callback invoked in the tick. You must return either SUCCESS of FAILURE
NodeStatus onRunning() override final;

void onHalted() override final;

/** Callback invoked in the tick. You must return SUCCESS, FAILURE, or RUNNING.
*
* @param last_msg the latest message received, since the last tick.
* Will be empty if no new message received.
Expand All @@ -157,6 +161,8 @@ class RosTopicSubNode : public BT::ConditionNode

private:
bool createSubscriber(const std::string& topic_name);

NodeStatus checkStatus(const NodeStatus& status) const;
};

//----------------------------------------------------------------
Expand Down Expand Up @@ -187,51 +193,15 @@ template <class T>
inline RosTopicSubNode<T>::RosTopicSubNode(const std::string& instance_name,
const NodeConfig& conf,
const RosNodeParams& params)
: BT::ConditionNode(instance_name, conf), node_(params.nh)
: BT::StatefulActionNode(instance_name, conf), node_(params.nh)
{
// check port remapping
auto portIt = config().input_ports.find("topic_name");
if(portIt != config().input_ports.end())
// Check if default_port_value was used to provide a topic name.
if(!params.default_port_value.empty())
{
const std::string& bb_topic_name = portIt->second;

if(bb_topic_name.empty() || bb_topic_name == "__default__placeholder__")
{
if(params.default_port_value.empty())
{
throw std::logic_error("Both [topic_name] in the InputPort and the RosNodeParams "
"are empty.");
}
else
{
createSubscriber(params.default_port_value);
}
}
else if(!isBlackboardPointer(bb_topic_name))
{
// If the content of the port "topic_name" is not
// a pointer to the blackboard, but a static string, we can
// create the client in the constructor.
createSubscriber(bb_topic_name);
}
else
{
// do nothing
// createSubscriber will be invoked in the first tick().
}
}
else
{
if(params.default_port_value.empty())
{
throw std::logic_error("Both [topic_name] in the InputPort and the RosNodeParams "
"are empty.");
}
else
{
createSubscriber(params.default_port_value);
}
topic_name_ = params.default_port_value;
}

// If no value was provided through the params, assume that the topic name will be set through a port when the node is first ticked.
}

template <class T>
Expand Down Expand Up @@ -282,45 +252,73 @@ inline bool RosTopicSubNode<T>::createSubscriber(const std::string& topic_name)
signal_connection_ = sub_instance_->broadcaster.connect(
[this](const std::shared_ptr<T> msg) { last_msg_ = msg; });

topic_name_ = topic_name;
return true;
}
template <class T>
inline NodeStatus RosTopicSubNode<T>::checkStatus(const NodeStatus& status) const
{
if(!isStatusActive(status))
{
throw std::logic_error("RosTopicSubNode: the callback must return SUCCESS, FAILURE, "
"or RUNNING");
}
return status;
};

template <class T>
inline NodeStatus RosTopicSubNode<T>::tick()
inline NodeStatus RosTopicSubNode<T>::onStart()
{
// First, check if the subscriber_ is valid and that the name of the
// topic_name in the port didn't change.
// otherwise, create a new subscriber
std::string topic_name;
getInput("topic_name", topic_name);

if(!topic_name.empty() && topic_name != "__default__placeholder__" &&
topic_name != topic_name_)
// If the topic name was provided through an input port, is a valid topic name, and is different from the stored topic name, update the stored topic name to the new string.
if(const auto topic_name = getInput<std::string>("topic_name");
topic_name.has_value() && !topic_name->empty() &&
topic_name.value() != "__default__placeholder__" && topic_name != topic_name_)
{
sub_instance_.reset();
topic_name_ = topic_name.value();
}

if(!sub_instance_)
createSubscriber(topic_name_);

// Check to see if the subscriber has received a message since we initially created it.
sub_instance_->callback_group_executor.spin_some();

// If no message was received, return RUNNING
if(last_msg_ == nullptr)
{
createSubscriber(topic_name);
return NodeStatus::RUNNING;
}

auto CheckStatus = [](NodeStatus status) {
if(!isStatusCompleted(status))
{
throw std::logic_error("RosTopicSubNode: the callback must return"
"either SUCCESS or FAILURE");
}
return status;
};
auto status = checkStatus(onTick(last_msg_));
if(!latchLastMessage())
{
last_msg_.reset();
}
return status;
}

template <class T>
inline NodeStatus RosTopicSubNode<T>::onRunning()
{
// Spin the subscriber to process any new message that has been received since the last tick
sub_instance_->callback_group_executor.spin_some();
auto status = CheckStatus(onTick(last_msg_));

// If no message was received, return RUNNING
if(last_msg_ == nullptr)
{
return NodeStatus::RUNNING;
}

// TODO(schornakj): handle timeout here
Copy link
Author

@schornakj schornakj Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it useful to allow timing out while waiting for a message to be received? We could get it from RosNodeParams::server_timeout like we do in the action and service client nodes. Alternatively, it's possible to use behavior tree logic to sleep for some duration in parallel and trigger a failure if the sleep finishes before a message is received, so even without a timeout there's a way to break out of deadlock if needed.


auto status = checkStatus(onTick(last_msg_));

if(!latchLastMessage())
{
last_msg_.reset();
}
return status;
}

template <class T>
inline void RosTopicSubNode<T>::onHalted()
{}
} // namespace BT
15 changes: 11 additions & 4 deletions behaviortree_ros2/test/test_bt_topic_sub_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ class SubNode : public RosTopicSubNode<Empty>
private:
NodeStatus onTick(const std::shared_ptr<Empty>& last_msg) override
{
// GIVEN if any message is received on the topic
if(last_msg != nullptr)
if(last_msg == nullptr)
{
return NodeStatus::SUCCESS;
return NodeStatus::RUNNING;
}
return NodeStatus::FAILURE;
return NodeStatus::SUCCESS;
}
};

Expand Down Expand Up @@ -100,6 +99,8 @@ TEST_F(TestBtTopicSubNode, TopicAsParam)
// GIVEN we create publisher with default QoS settings after creating the BT node
createPublisher(rclcpp::QoS(kHistoryDepth));

EXPECT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::RUNNING));

// GIVEN the publisher has published a message
publisher_->publish(std_msgs::build<Empty>());

Expand All @@ -123,6 +124,8 @@ TEST_F(TestBtTopicSubNode, TopicFromStaticStringInPort)
// GIVEN we create publisher with default QoS settings after creating the BT node
createPublisher(rclcpp::QoS(kHistoryDepth));

EXPECT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::RUNNING));

// GIVEN the publisher has published a message
publisher_->publish(std_msgs::build<Empty>());

Expand All @@ -146,6 +149,8 @@ TEST_F(TestBtTopicSubNode, TopicFromBlackboard)
// GIVEN we create publisher with default QoS settings after creating the BT node
createPublisher(rclcpp::QoS(kHistoryDepth));

EXPECT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::RUNNING));

// GIVEN the publisher has published a message
publisher_->publish(std_msgs::build<Empty>());

Expand All @@ -169,6 +174,8 @@ TEST_F(TestBtTopicSubNode, TopicAsParamQoSBestEffort)
// GIVEN we create publisher with BestEffort reliability QoS settings after creating the BT node
createPublisher(rclcpp::QoS(kHistoryDepth).best_effort());

EXPECT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::RUNNING));

// GIVEN the publisher has published a message
publisher_->publish(std_msgs::build<Empty>());

Expand Down