Skip to content

Allow GenericPublisher / GenericSubscription to take a QoS profile (re-do #337) #340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ ament_target_dependencies(${PROJECT_NAME}
rosbag2_compression
rosbag2_cpp
shared_queues_vendor
yaml_cpp_vendor
)

include(cmake/configure_python.cmake)
Expand Down Expand Up @@ -126,22 +127,25 @@ if(BUILD_TESTING)
endif()

ament_add_gmock(test_rosbag2_node
test/rosbag2_transport/test_rosbag2_node.cpp
src/rosbag2_transport/generic_publisher.cpp
src/rosbag2_transport/generic_subscription.cpp
src/rosbag2_transport/qos.cpp
src/rosbag2_transport/rosbag2_node.cpp
test/rosbag2_transport/test_rosbag2_node.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET test_rosbag2_node)
target_include_directories(test_rosbag2_node
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>
$<INSTALL_INTERFACE:include>)
ament_target_dependencies(test_rosbag2_node
ament_index_cpp
rosbag2_cpp
rclcpp
rosbag2_cpp
rosbag2_test_common
test_msgs
rosbag2_test_common)
yaml_cpp_vendor)
endif()

ament_add_gmock(test_formatter
Expand Down Expand Up @@ -179,15 +183,18 @@ if(BUILD_TESTING)
endif()

ament_add_gmock(test_qos
src/rosbag2_transport/qos.cpp
test/rosbag2_transport/test_qos.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}
${SKIP_TEST})
if(TARGET test_qos)
target_link_libraries(test_qos rosbag2_transport)
target_include_directories(test_qos
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>)
ament_target_dependencies(test_qos rosbag2_test_common)
ament_target_dependencies(test_qos
rosbag2_test_common
yaml_cpp_vendor)
endif()

endif()
Expand Down
17 changes: 14 additions & 3 deletions rosbag2_transport/src/rosbag2_transport/generic_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,25 @@
#include <memory>
#include <string>

namespace
{
rcl_publisher_options_t rosbag2_get_publisher_options(const rclcpp::QoS & qos)
{
auto options = rcl_publisher_get_default_options();
options.qos = qos.get_rmw_qos_profile();
return options;
}
} // unnamed namespace

namespace rosbag2_transport
{

GenericPublisher::GenericPublisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rosidl_message_type_support_t & type_support)
: rclcpp::PublisherBase(node_base, topic, type_support, rcl_publisher_get_default_options())
const rosidl_message_type_support_t & type_support,
const std::string & topic_name,
const rclcpp::QoS & qos)
: rclcpp::PublisherBase(node_base, topic_name, type_support, rosbag2_get_publisher_options(qos))
{}

void GenericPublisher::publish(std::shared_ptr<rmw_serialized_message_t> message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ class GenericPublisher : public rclcpp::PublisherBase
public:
GenericPublisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rosidl_message_type_support_t & type_support);
const rosidl_message_type_support_t & type_support,
const std::string & topic_name,
const rclcpp::QoS & qos);

virtual ~GenericPublisher() = default;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,30 @@

#include "rosbag2_transport/logging.hpp"

namespace
{
rcl_subscription_options_t rosbag2_get_subscription_options(const rclcpp::QoS & qos)
{
auto options = rcl_subscription_get_default_options();
options.qos = qos.get_rmw_qos_profile();
return options;
}
} // unnamed namespace

namespace rosbag2_transport
{

GenericSubscription::GenericSubscription(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const rosidl_message_type_support_t & ts,
const std::string & topic_name,
const rclcpp::QoS & qos,
std::function<void(std::shared_ptr<rmw_serialized_message_t>)> callback)
: SubscriptionBase(
node_base,
ts,
topic_name,
rcl_subscription_get_default_options(),
rosbag2_get_subscription_options(qos),
true),
default_allocator_(rcutils_get_default_allocator()),
callback_(callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class GenericSubscription : public rclcpp::SubscriptionBase
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const rosidl_message_type_support_t & ts,
const std::string & topic_name,
const rclcpp::QoS & qos,
std::function<void(std::shared_ptr<rmw_serialized_message_t>)> callback);

// Same as create_serialized_message() as the subscription is to serialized_messages only
Expand Down
4 changes: 3 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "rosbag2_transport/logging.hpp"

#include "qos.hpp"
#include "rosbag2_node.hpp"
#include "replayable_message.hpp"

Expand Down Expand Up @@ -158,7 +159,8 @@ void Player::prepare_publishers()
for (const auto & topic : topics) {
publishers_.insert(
std::make_pair(
topic.name, rosbag2_transport_->create_generic_publisher(topic.name, topic.type)));
topic.name, rosbag2_transport_->create_generic_publisher(
topic.name, topic.type, Rosbag2QoS{})));
}
}

Expand Down
7 changes: 1 addition & 6 deletions rosbag2_transport/src/rosbag2_transport/qos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,14 @@
# pragma warning(pop)
#endif

#include "rosbag2_transport/visibility_control.hpp"

namespace rosbag2_transport
{
/// Simple wrapper around rclcpp::QoS to provide a default constructor for YAML deserialization.
class Rosbag2QoS : public rclcpp::QoS
{
public:
Rosbag2QoS()
: rclcpp::QoS(0) {} // 0 history depth is always overwritten on deserializing
: rclcpp::QoS(rmw_qos_profile_default.depth) {}
explicit Rosbag2QoS(const rclcpp::QoS & value)
: rclcpp::QoS(value) {}
};
Expand All @@ -57,10 +55,7 @@ struct convert<rmw_time_t>
template<>
struct convert<rosbag2_transport::Rosbag2QoS>
{
ROSBAG2_TRANSPORT_PUBLIC
static Node encode(const rosbag2_transport::Rosbag2QoS & qos);

ROSBAG2_TRANSPORT_PUBLIC
static bool decode(const Node & node, rosbag2_transport::Rosbag2QoS & qos);
};
} // namespace YAML
Expand Down
1 change: 1 addition & 0 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ Recorder::create_subscription(
auto subscription = node_->create_generic_subscription(
topic_name,
topic_type,
Rosbag2QoS{},
[this, topic_name](std::shared_ptr<rmw_serialized_message_t> message) {
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = message;
Expand Down
8 changes: 5 additions & 3 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,28 @@ Rosbag2Node::Rosbag2Node(const std::string & node_name)
{}

std::shared_ptr<GenericPublisher> Rosbag2Node::create_generic_publisher(
const std::string & topic, const std::string & type)
const std::string & topic, const std::string & type, const rclcpp::QoS & qos)
{
auto type_support = rosbag2_cpp::get_typesupport(type, "rosidl_typesupport_cpp");
return std::make_shared<GenericPublisher>(get_node_base_interface().get(), topic, *type_support);
return std::make_shared<GenericPublisher>(
get_node_base_interface().get(), *type_support, topic, qos);
}

std::shared_ptr<GenericSubscription> Rosbag2Node::create_generic_subscription(
const std::string & topic,
const std::string & type,
const rclcpp::QoS & qos,
std::function<void(std::shared_ptr<rmw_serialized_message_t>)> callback)
{
auto type_support = rosbag2_cpp::get_typesupport(type, "rosidl_typesupport_cpp");

auto subscription = std::shared_ptr<GenericSubscription>();

try {
subscription = std::make_shared<GenericSubscription>(
get_node_base_interface().get(),
*type_support,
topic,
qos,
callback);

get_node_topics_interface()->add_subscription(subscription, nullptr);
Expand Down
3 changes: 2 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/rosbag2_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ class Rosbag2Node : public rclcpp::Node

std::shared_ptr<GenericPublisher>
create_generic_publisher(
const std::string & topic, const std::string & type);
const std::string & topic, const std::string & type, const rclcpp::QoS & qos);

std::shared_ptr<GenericSubscription>
create_generic_subscription(
const std::string & topic,
const std::string & type,
const rclcpp::QoS & qos,
std::function<void(std::shared_ptr<rmw_serialized_message_t>)> callback);

std::unordered_map<std::string, std::string>
Expand Down
62 changes: 59 additions & 3 deletions rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
#include "test_msgs/message_fixtures.hpp"
#include "test_msgs/msg/basic_types.hpp"

#include "../../src/rosbag2_transport/rosbag2_node.hpp"
#include "qos.hpp"
#include "rosbag2_node.hpp"

using namespace ::testing; // NOLINT
using namespace rosbag2_test_common; // NOLINT
Expand Down Expand Up @@ -64,7 +65,7 @@ class RosBag2NodeFixture : public Test
std::vector<std::string> messages;
size_t counter = 0;
auto subscription = node_->create_generic_subscription(
topic_name, type,
topic_name, type, rosbag2_transport::Rosbag2QoS{},
[this, &counter, &messages](std::shared_ptr<rmw_serialized_message_t> message) {
auto string_message =
memory_management_.deserialize_message<test_msgs::msg::Strings>(message);
Expand All @@ -91,6 +92,20 @@ class RosBag2NodeFixture : public Test
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

template<typename Condition, typename Duration>
bool wait_for(const Condition & condition, const Duration & timeout)
{
using clock = std::chrono::system_clock;
auto start = clock::now();
while (!condition()) {
if ((clock::now() - start) > timeout) {
return false;
}
rclcpp::spin_some(node_);
}
return true;
}

MemoryManagement memory_management_;
std::shared_ptr<rosbag2_transport::Rosbag2Node> node_;
rclcpp::Node::SharedPtr publisher_node_;
Expand All @@ -105,7 +120,8 @@ TEST_F(RosBag2NodeFixture, publisher_and_subscriber_work)
std::string topic_name = "string_topic";
std::string type = "test_msgs/Strings";

auto publisher = node_->create_generic_publisher(topic_name, type);
auto publisher = node_->create_generic_publisher(
topic_name, type, rosbag2_transport::Rosbag2QoS{});

auto subscriber_future_ = std::async(
std::launch::async, [this, topic_name, type] {
Expand All @@ -125,6 +141,46 @@ TEST_F(RosBag2NodeFixture, publisher_and_subscriber_work)
EXPECT_THAT(subscribed_messages[0], StrEq("Hello World"));
}

TEST_F(RosBag2NodeFixture, generic_subscription_uses_qos)
{
// If the GenericSubscription does not use the provided QoS profile,
// its request will be incompatible with the Publisher's offer and no messages will be passed.
using namespace std::chrono_literals;
std::string topic_name = "string_topic";
std::string topic_type = "test_msgs/Strings";
rclcpp::QoS qos = rclcpp::SensorDataQoS();

auto publisher = node_->create_publisher<test_msgs::msg::Strings>(topic_name, qos);
auto subscription = node_->create_generic_subscription(
topic_name, topic_type, qos,
[](std::shared_ptr<rmw_serialized_message_t>/* message */) {});
auto connected = [publisher, subscription]() -> bool {
return publisher->get_subscription_count() && subscription->get_publisher_count();
};
// It normally takes < 20ms, 5s chosen as "a very long time"
ASSERT_TRUE(wait_for(connected, 5s));
}

TEST_F(RosBag2NodeFixture, generic_publisher_uses_qos)
{
// If the GenericPublisher does not use the provided QoS profile,
// its offer will be incompatible with the Subscription's request and no messages will be passed.
using namespace std::chrono_literals;
std::string topic_name = "string_topic";
std::string topic_type = "test_msgs/Strings";
rclcpp::QoS qos = rosbag2_transport::Rosbag2QoS().transient_local();

auto publisher = node_->create_generic_publisher(topic_name, topic_type, qos);
auto subscription = node_->create_subscription<test_msgs::msg::Strings>(
topic_name, qos,
[](std::shared_ptr<test_msgs::msg::Strings>/* message */) {});
auto connected = [publisher, subscription]() -> bool {
return publisher->get_subscription_count() && subscription->get_publisher_count();
};
// It normally takes < 20ms, 5s chosen as "a very long time"
ASSERT_TRUE(wait_for(connected, 5s));
}

TEST_F(RosBag2NodeFixture, get_topics_with_types_returns_empty_if_topic_does_not_exist) {
create_publisher("string_topic");

Expand Down