Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 51 additions & 14 deletions rosbag2_transport/src/rosbag2_transport/locked_priority_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,34 @@
#include <optional>
#include <queue>
#include <vector>
#include <utility>

#include "rcpputils/thread_safety_annotations.hpp"
#include "rcpputils/unique_lock.hpp"

/// \brief `std::priority_queue` wrapper with locks.
/// \tparam T the element type
/// \tparam Container the underlying container type
/// \tparam Compare the comparator
/// \brief `std::priority_queue` wrapper with locks and stable sorting.
/// \details This class wraps a `std::priority_queue` and provides locking for thread-safe
/// access. It uses std::vector as underlying container for the `std::priority_queue` and
/// adds a strictly increasing insertion sequence number to each element to
/// ensure a stable sort when two elements are equivalent according to the comparator.
/// This is useful when multiple elements has the same priorities in the queue, and we want to
/// ensure that elements with the same priority are popped in the order they were pushed.
/// For example, this is useful when multiple messages have the same timestamp, and we need to
/// preserve the original messages order.
/// \note The insertion sequence number is a builtin feature of this class, and users should not
/// provide it themselves.
/// \tparam T The element type
/// \see std::priority_queue
template<
typename T,
typename Container = std::vector<T>,
typename Compare = std::less<typename Container::value_type>
>
template<typename T>
class LockedPriorityQueue
{
public:
using Comparator = std::function<bool(const T &, const T &)>;

/// \brief Constructor.
/// \param compare the comparator object
explicit LockedPriorityQueue(const Compare & compare)
: queue_(compare)
explicit LockedPriorityQueue(const Comparator & compare)
: queue_(StableComparator{compare})
{}

LockedPriorityQueue() = delete;
Expand All @@ -54,7 +61,7 @@ class LockedPriorityQueue
void push(const T & element)
{
rcpputils::unique_lock<std::mutex> lk(queue_mutex_);
queue_.push(element);
queue_.emplace(element, ++insert_sequence_number_);
}

/// \brief Remove the top element.
Expand Down Expand Up @@ -87,6 +94,7 @@ class LockedPriorityQueue
while (!queue_.empty()) {
queue_.pop();
}
insert_sequence_number_ = 0;
}

/// \brief Try to take the top element from the queue.
Expand All @@ -97,14 +105,43 @@ class LockedPriorityQueue
if (queue_.empty()) {
return std::nullopt;
}
T e = queue_.top();
T e = queue_.top().first;
queue_.pop();
return e;
}

private:
using Container = std::vector<std::pair<T, size_t>>;

/// \brief Internal wrapper comparator around the user-provided comparator.
/// \details This comparator uses the user-provided comparator to compare the `T` values.
/// If the `T` values are equivalent according to the user comparator, it uses
/// the insertion sequence number to break ties, ensuring that earlier inserted elements
/// are considered "less than" later inserted elements.
struct StableComparator
{
Comparator user_comp;
bool operator()(const std::pair<T, size_t> & l, const std::pair<T, size_t> & r) const
{
const auto & [l_t_value, l_insertion_seq_num] = l;
const auto & [r_t_value, r_insertion_seq_num] = r;
if (user_comp(l_t_value, r_t_value)) {
return true;
}
if (user_comp(r_t_value, l_t_value)) {
return false;
}
// If values are equal according to user's comparator, use sequence numbers for stability
// Tie-breaker: earlier insertion comes first
return l_insertion_seq_num > r_insertion_seq_num;
}
};

mutable std::mutex queue_mutex_;
std::priority_queue<T, Container, Compare> queue_ RCPPUTILS_TSA_GUARDED_BY(queue_mutex_);
std::priority_queue<typename Container::value_type, Container, StableComparator> queue_
RCPPUTILS_TSA_GUARDED_BY(queue_mutex_);

size_t insert_sequence_number_{0} RCPPUTILS_TSA_GUARDED_BY(queue_mutex_);
};

#endif // ROSBAG2_TRANSPORT__LOCKED_PRIORITY_QUEUE_HPP_
15 changes: 5 additions & 10 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,9 @@ class PlayerImpl
Player * owner_;
rosbag2_transport::PlayOptions play_options_;
rcutils_time_point_value_t play_until_timestamp_ = -1;
using BagMessageComparator = std::function<
bool(
const rosbag2_storage::SerializedBagMessageSharedPtr &,
const rosbag2_storage::SerializedBagMessageSharedPtr &)>;
LockedPriorityQueue<
rosbag2_storage::SerializedBagMessageSharedPtr,
std::vector<rosbag2_storage::SerializedBagMessageSharedPtr>,
BagMessageComparator> message_queue_;
LockedPriorityQueue<rosbag2_storage::SerializedBagMessageSharedPtr> message_queue_;
using BagMessageComparator =
LockedPriorityQueue<rosbag2_storage::SerializedBagMessageSharedPtr>::Comparator;
mutable std::future<void> storage_loading_future_;
std::atomic_bool load_storage_content_{true};
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
Expand Down Expand Up @@ -421,7 +416,7 @@ class PlayerImpl
const rosbag2_storage::SerializedBagMessageSharedPtr & l,
const rosbag2_storage::SerializedBagMessageSharedPtr & r) const
{
return l->recv_timestamp > r->recv_timestamp;
return l->recv_timestamp > r->recv_timestamp; // Smaller timestamp comes first
}
} bag_message_chronological_recv_timestamp_comparator;

Expand All @@ -432,7 +427,7 @@ class PlayerImpl
const rosbag2_storage::SerializedBagMessageSharedPtr & l,
const rosbag2_storage::SerializedBagMessageSharedPtr & r) const
{
return l->send_timestamp > r->send_timestamp;
return l->send_timestamp > r->send_timestamp; // Smaller timestamp comes first
}
} bag_message_chronological_send_timestamp_comparator;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ class Rosbag2TransportTestFixture : public Test
return bag_msg;
}

template<typename MessageT>
std::shared_ptr<MessageT> deserialize_test_message(
const std::shared_ptr<rosbag2_storage::SerializedBagMessage> bag_msg)
{
return memory_management_.deserialize_message<MessageT>(bag_msg->serialized_data);
}

static std::string format_message_order(
const TestParamInfo<rosbag2_transport::MessageOrder> & info)
{
Expand Down
158 changes: 158 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_play.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,164 @@ TEST_P(RosBag2PlayTestFixtureMessageOrder, recorded_msgs_are_played_for_all_topi
EXPECT_EQ(total_messages, num_played_messages);
}

TEST_P(RosBag2PlayTestFixtureMessageOrder,
order_of_msgs_with_the_same_timestamp_in_one_bag_respected_during_replay)
{
const auto msg = get_messages_basic_types()[0];

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{1u, "topic1", "test_msgs/msg/BasicTypes", "", {}, ""},
{2u, "topic2", "test_msgs/msg/BasicTypes", "", {}, ""},
};

// Make sure messages are in increasing order by timestamp.
// However, make messages have the same timestamps for some messages.
// Expectation is that messages will be replayed in the same order as they were recorded
// if they have the same recv_timestamp or send_timestamp
std::vector<std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>> messages_list{};
msg->int32_value = 0;
messages_list.emplace_back(std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>{
(msg->int32_value++, serialize_test_message("topic1", 1, 1, msg)),
(msg->int32_value++, serialize_test_message("topic2", 1, 1, msg)),
(msg->int32_value++, serialize_test_message("topic1", 5, 4, msg)),
(msg->int32_value++, serialize_test_message("topic2", 5, 4, msg)),
(msg->int32_value++, serialize_test_message("topic1", 9, 4, msg)),
(msg->int32_value++, serialize_test_message("topic2", 9, 14, msg))
});

std::vector<rosbag2_transport::Player::reader_storage_options_pair_t> bags{};
std::size_t total_messages = 0u;
for (const auto & curr_bag_messages : messages_list) {
auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
total_messages += curr_bag_messages.size();
prepared_mock_reader->prepare(curr_bag_messages, topic_types);
bags.emplace_back(
std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader)), storage_options_);
}
ASSERT_GT(total_messages, 0u);
const rosbag2_transport::MessageOrder message_order = GetParam();
play_options_.message_order = message_order;
play_options_.read_ahead_queue_size = total_messages - 3;
auto player = std::make_shared<rosbag2_transport::Player>(std::move(bags), play_options_);
std::size_t num_played_messages = 0u;
rcutils_time_point_value_t last_timestamp = 0;
const auto get_timestamp =
[message_order](std::shared_ptr<rosbag2_storage::SerializedBagMessage> msg) {
switch (message_order) {
case rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP:
return msg->recv_timestamp;
case rosbag2_transport::MessageOrder::SENT_TIMESTAMP:
return msg->send_timestamp;
default:
throw std::runtime_error("unknown rosbag2_transport::MessageOrder value");
}
};
const auto callback = [&](std::shared_ptr<rosbag2_storage::SerializedBagMessage> playing_msg) {
// Make sure messages are played in order
num_played_messages++;
const auto timestamp = get_timestamp(playing_msg);
using MessageT = typename decltype(msg)::element_type;
const auto deserialized_msg = deserialize_test_message<MessageT>(playing_msg);
// The int32_value was set in an increasing order when creating the messages
// std::cout << "deserialized_value = " << deserialized_msg->int32_value << ", timestamp = "
// << RCUTILS_NS_TO_MS(timestamp) << ", topic_name = `" << playing_msg->topic_name << "`\n";
EXPECT_EQ(deserialized_msg->int32_value, num_played_messages) << ", timestamp = " <<
RCUTILS_NS_TO_MS(timestamp) << ", topic_name = `" << playing_msg->topic_name << "`\n";
EXPECT_LE(last_timestamp, timestamp);
last_timestamp = timestamp;
};
player->add_on_play_message_pre_callback(callback);
player->play();
ASSERT_TRUE(player->wait_for_playback_to_start(10s));
ASSERT_TRUE(player->wait_for_playback_to_finish(10s));
EXPECT_EQ(total_messages, num_played_messages);
}

TEST_P(RosBag2PlayTestFixtureMessageOrder,
order_of_msgs_with_the_same_timestamp_respected_in_multibag_replay)
{
const auto msg = get_messages_basic_types()[0];

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{1u, "topic1", "test_msgs/msg/BasicTypes", "", {}, ""},
{2u, "topic2", "test_msgs/msg/BasicTypes", "", {}, ""},
{1u, "topic3", "test_msgs/msg/BasicTypes", "", {}, ""},
{2u, "topic4", "test_msgs/msg/BasicTypes", "", {}, ""},
};

// Make sure each reader's/bag's messages are in increasing order by timestamp.
// However, make messages from different bags have the same recv_timestamp and send_timestamp
// Expectation is that if appeared two messages with the same timestamps from the different bags,
// the one that was added first to the read_ahead_queue
// (i.e. from the reader/bag with the lower index) will be played first
std::vector<std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>> messages_list{};
msg->int32_value = 1;
messages_list.emplace_back(std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>{
serialize_test_message("topic1", 1, 1, msg),
(msg->int32_value += 2, serialize_test_message("topic3", 3, 2, msg)),
(msg->int32_value += 2, serialize_test_message("topic1", 5, 4, msg)),
(msg->int32_value += 2, serialize_test_message("topic3", 7, 7, msg)),
(msg->int32_value += 2, serialize_test_message("topic1", 9, 8, msg)),
(msg->int32_value += 2, serialize_test_message("topic3", 11, 14, msg))
});
msg->int32_value = 2;
messages_list.emplace_back(std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>{
serialize_test_message("topic2", 1, 1, msg),
(msg->int32_value += 2, serialize_test_message("topic4", 3, 2, msg)),
// (msg->int32_value += 2, serialize_test_message("topic1", 5, 4, msg)),
(msg->int32_value += 2, serialize_test_message("topic2", 5, 4, msg)),
(msg->int32_value += 2, serialize_test_message("topic4", 7, 7, msg)),
(msg->int32_value += 2, serialize_test_message("topic2", 9, 9, msg)),
(msg->int32_value += 2, serialize_test_message("topic4", 11, 14, msg))
});
std::vector<rosbag2_transport::Player::reader_storage_options_pair_t> bags{};
std::size_t total_messages = 0u;
for (const auto & curr_bag_messages : messages_list) {
auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
total_messages += curr_bag_messages.size();
prepared_mock_reader->prepare(curr_bag_messages, topic_types);
bags.emplace_back(
std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader)), storage_options_);
}
ASSERT_GT(total_messages, 0u);
const rosbag2_transport::MessageOrder message_order = GetParam();
play_options_.message_order = message_order;
play_options_.read_ahead_queue_size = total_messages - 3;
auto player = std::make_shared<rosbag2_transport::Player>(std::move(bags), play_options_);
std::size_t num_played_messages = 0u;
rcutils_time_point_value_t last_timestamp = 0;
const auto get_timestamp =
[message_order](std::shared_ptr<rosbag2_storage::SerializedBagMessage> msg) {
switch (message_order) {
case rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP:
return msg->recv_timestamp;
case rosbag2_transport::MessageOrder::SENT_TIMESTAMP:
return msg->send_timestamp;
default:
throw std::runtime_error("unknown rosbag2_transport::MessageOrder value");
}
};
const auto callback = [&](std::shared_ptr<rosbag2_storage::SerializedBagMessage> playing_msg) {
// Make sure messages are played in order
num_played_messages++;
const auto timestamp = get_timestamp(playing_msg);
using MessageT = typename decltype(msg)::element_type;
const auto deserialized_msg = deserialize_test_message<MessageT>(playing_msg);
// The int32_value was set in an increasing order when creating the messages
// std::cout << "deserialized_value = " << deserialized_msg->int32_value << ", timestamp = "
// << RCUTILS_NS_TO_MS(timestamp) << ", topic_name = `" << playing_msg->topic_name << "`\n";
EXPECT_EQ(deserialized_msg->int32_value, num_played_messages) << ", timestamp = " <<
RCUTILS_NS_TO_MS(timestamp) << ", topic_name = `" << playing_msg->topic_name << "`\n";
EXPECT_LE(last_timestamp, timestamp);
last_timestamp = timestamp;
};
player->add_on_play_message_pre_callback(callback);
player->play();
ASSERT_TRUE(player->wait_for_playback_to_start(10s));
ASSERT_TRUE(player->wait_for_playback_to_finish(10s));
EXPECT_EQ(total_messages, num_played_messages);
}

TEST_F(RosBag2PlayTestFixture, high_freq_topics_does_not_starve_in_multibag_playback) {
static constexpr const char * high_freq_topic1_name = "HighFreqTopic1";
static constexpr const char * low_freq_topic1_name = "LowFreqTopic1";
Expand Down
Loading