Skip to content

Intelligently subscribe to topics according to their QoS profiles #355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_TEST_COMMON__WAIT_FOR_HPP_
#define ROSBAG2_TEST_COMMON__WAIT_FOR_HPP_

#include <chrono>

#include "rclcpp/rclcpp.hpp"

namespace rosbag2_test_common
{
template<typename Timeout, typename Node, typename Condition>
bool spin_and_wait_for(Timeout timeout, const Node & node, Condition condition)
{
using clock = std::chrono::system_clock;
auto start = clock::now();
while (!condition()) {
if ((clock::now() - start) > timeout) {
return false;
}
rclcpp::spin_some(node);
}
return true;
}
} // namespace rosbag2_test_common

#endif // ROSBAG2_TEST_COMMON__WAIT_FOR_HPP_
133 changes: 91 additions & 42 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,6 @@
# pragma warning(pop)
#endif

namespace
{
// TODO(emersonknapp) re-enable subscription_qos_for_topic once the cyclone situation is resolved
#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
bool all_qos_same(const std::vector<rclcpp::TopicEndpointInfo> & values)
{
auto adjacent_different_elements_it = std::adjacent_find(
values.begin(),
values.end(),
[](const rclcpp::TopicEndpointInfo & left, const rclcpp::TopicEndpointInfo & right) -> bool {
return left.qos_profile() != right.qos_profile();
}
);
// No adjacent elements were different, so all are the same.
return adjacent_different_elements_it == values.end();
}
#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
} // unnamed namespace

namespace rosbag2_transport
{
Recorder::Recorder(std::shared_ptr<rosbag2_cpp::Writer> writer, std::shared_ptr<Rosbag2Node> node)
Expand Down Expand Up @@ -106,11 +87,9 @@ void Recorder::topics_discovery(
while (rclcpp::ok()) {
auto topics_to_subscribe =
get_requested_or_available_topics(requested_topics, include_hidden_topics);
#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
for (const auto & topic_and_type : topics_to_subscribe) {
warn_if_new_qos_for_subscribed_topic(topic_and_type.first);
}
#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
auto missing_topics = get_missing_topics(topics_to_subscribe);
subscribe_topics(missing_topics);

Expand Down Expand Up @@ -217,26 +196,82 @@ std::string Recorder::serialized_offered_qos_profiles_for_topic(const std::strin
return YAML::Dump(offered_qos_profiles);
}

rclcpp::QoS Recorder::subscription_qos_for_topic(const std::string & topic_name)
rclcpp::QoS Recorder::subscription_qos_for_topic(const std::string & topic_name) const
{
rclcpp::QoS subscription_qos{rclcpp::KeepAll()};
if (topic_qos_profile_overrides_.count(topic_name)) {
subscription_qos = topic_qos_profile_overrides_.at(topic_name);
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding subscription profile for " << topic_name);
return topic_qos_profile_overrides_.at(topic_name);
} else {
return adapt_qos_to_publishers(topic_name);
}
// TODO(emersonknapp) re-enable subscription_qos_for_topic once the cyclone situation is resolved
#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
return adapt_qos_to_publishers(topic_name);
}

rclcpp::QoS Recorder::adapt_qos_to_publishers(const std::string & topic_name) const
{
auto endpoints = node_->get_publishers_info_by_topic(topic_name);
if (!endpoints.empty() && all_qos_same(endpoints)) {
return Rosbag2QoS(endpoints[0].qos_profile()).default_history();
size_t num_endpoints = endpoints.size();
size_t reliability_reliable_endpoints_count = 0;
size_t durability_transient_local_endpoints_count = 0;
for (const auto & endpoint : endpoints) {
const auto & profile = endpoint.qos_profile().get_rmw_qos_profile();
if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
reliability_reliable_endpoints_count++;
}
if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
durability_transient_local_endpoints_count++;
}
}

// We set policies in order as defined in rmw_qos_profile_t
Rosbag2QoS request_qos;
// Policy: history, depth
// History does not affect compatibility
request_qos.default_history();

// Policy: reliability
if (reliability_reliable_endpoints_count == num_endpoints) {
request_qos.reliable();
} else {
if (reliability_reliable_endpoints_count > 0) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"Some, but not all, publishers on topic \"" << topic_name << "\" "
"are offering RMW_QOS_POLICY_RELIABILITY_RELIABLE. "
"Falling back to RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT "
"as it will connect to all publishers. "
"Some messages from Reliable publishers could be dropped.");
}
request_qos.best_effort();
}

// Policy: durability
// If all publishers offer transient_local, we can request it and receive latched messages
if (durability_transient_local_endpoints_count == num_endpoints) {
request_qos.transient_local();
} else {
if (durability_transient_local_endpoints_count > 0) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"Some, but not all, publishers on topic \"" << topic_name << "\" "
"are offering RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. "
"Falling back to RMW_QOS_POLICY_DURABILITY_VOLATILE "
"as it will connect to all publishers. "
"Previously-published latched messages will not be retrieved.");
}
request_qos.durability_volatile();
}
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"Topic " << topic_name << " has publishers offering different QoS settings. "
"Cannot determine what QoS to request, falling back to default QoS profile."
);
topics_warned_about_incompatibility_.insert(topic_name);
#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
return subscription_qos;
// Policy: deadline
// Deadline does not affect delivery of messages,
// and we do not record Deadline"Missed events.
// We can always use unspecified deadline, which will be compatible with all publishers.

// Policy: lifespan
// Lifespan does not affect compatibiliy

// Policy: liveliness, liveliness_lease_duration
// Liveliness does not affect delivery of messages,
// and we do not record LivelinessChanged events.
// We can always use unspecified liveliness, which will be compatible with all publishers.
return request_qos;
}

void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_name)
Expand All @@ -250,17 +285,31 @@ void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_na
// Already warned about this topic
return;
}
const auto & used_qos = existing_subscription->second->qos_profile();
const auto & used_profile = existing_subscription->second->qos_profile().get_rmw_qos_profile();
auto publishers_info = node_->get_publishers_info_by_topic(topic_name);
for (const auto & info : publishers_info) {
if (info.qos_profile() != used_qos) {
auto new_profile = info.qos_profile().get_rmw_qos_profile();
bool incompatible_reliability =
new_profile.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT &&
used_profile.reliability != RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT;
bool incompatible_durability =
new_profile.durability == RMW_QOS_POLICY_DURABILITY_VOLATILE &&
used_profile.durability != RMW_QOS_POLICY_DURABILITY_VOLATILE;

if (incompatible_reliability) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"A new publisher for subscribed topic " << topic_name << " was found that is offering "
"a (possibly) incompatible QoS profile. Not changing subscription QoS. "
"Messages from this publisher may not be recorded."
);
"A new publisher for subscribed topic " << topic_name << " "
"was found offering RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT, "
"but rosbag already subscribed requesting RMW_QOS_POLICY_RELIABILITY_RELIABLE. "
"Messages from this new publisher will not be recorded.");
topics_warned_about_incompatibility_.insert(topic_name);
} else if (incompatible_durability) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"A new publisher for susbcribed topic " << topic_name << " "
"was found offering RMW_QOS_POLICY_DURABILITY_VOLATILE, "
"but rosbag2 already subscribed requesting RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. "
"Messages from this new publisher will not be recorded.");
topics_warned_about_incompatibility_.insert(topic_name);
return;
}
}
}
Expand Down
13 changes: 9 additions & 4 deletions rosbag2_transport/src/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,19 @@ class Recorder
*
* Profiles are prioritized by:
* 1. The override specified in the record_options, if one exists for the topic.
* 2. The publisher's offered QoS profile if all current publishers are offering the exact same
* compatibility profile.
* 3. The default Rosbag2QoS profile, if the above conditions are not met.
* 2. Adapted subscription via `adapt_qos_to_publishers`
*
* \param topic_name The full name of the topic, with namespace (ex. /arm/joint_status).
* \return The QoS profile to be used for subscribing.
*/
rclcpp::QoS subscription_qos_for_topic(const std::string & topic_name);
rclcpp::QoS subscription_qos_for_topic(const std::string & topic_name) const;
/**
* Try to subscribe using publishers' offered QoS policies.
*
* Fall back to sensible defaults when we can't adapt robustly,
* erring in favor of creating compatible connections.
*/
rclcpp::QoS adapt_qos_to_publishers(const std::string & topic_name) const;

// Serialize all currently offered QoS profiles for a topic into a YAML list.
std::string serialized_offered_qos_profiles_for_topic(const std::string & topic_name);
Expand Down
15 changes: 7 additions & 8 deletions rosbag2_transport/test/rosbag2_transport/test_qos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,21 @@ TEST(TestQoS, supports_version_4)
YAML::Node loaded_node = YAML::Load(serialized_profiles);
auto deserialized_profiles = loaded_node.as<std::vector<rosbag2_transport::Rosbag2QoS>>();
ASSERT_EQ(deserialized_profiles.size(), 1u);
rosbag2_transport::Rosbag2QoS actual_qos = deserialized_profiles[0];
auto actual_qos = deserialized_profiles[0].get_rmw_qos_profile();

rmw_time_t zerotime{0, 0};
// Explicitly set up the same QoS profile in case defaults change
rclcpp::QoS expected_qos(10);
expected_qos
.keep_last(10)
auto expected_qos = rosbag2_transport::Rosbag2QoS{}
.default_history()
.reliable()
.durability_volatile()
.deadline(zerotime)
.lifespan(zerotime)
.liveliness(RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT)
.liveliness_lease_duration(zerotime)
.avoid_ros_namespace_conventions(false);
// Any values not present in the YAML should take the default value in both profiles
EXPECT_EQ(actual_qos, expected_qos);
.liveliness_lease_duration(zerotime).get_rmw_qos_profile();

EXPECT_EQ(actual_qos.reliability, expected_qos.reliability);
EXPECT_EQ(actual_qos.durability, expected_qos.durability);
}

TEST(TestQoS, detect_new_qos_fields)
Expand Down
Loading