Skip to content

Commit 951d175

Browse files
Bugfix: Player can't play with read_ahead_queue_size equal 1 (#2174)
* Bugfix: Player can't play with read_ahead_queue_size equal 1 Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Add missing deserialize_test_message() from #2172 Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Address review comments Co-Authored by: Christophe Bedard <bedard.christophe@gmail.com> Co-authored-by: Christophe Bedard <bedard.christophe@gmail.com> Signed-off-by: Michael Orlov <morlovmr@gmail.com> --------- Signed-off-by: Michael Orlov <morlovmr@gmail.com> Signed-off-by: Michael Orlov <michael.orlov@apex.ai> Co-authored-by: Christophe Bedard <bedard.christophe@gmail.com>
1 parent b08269e commit 951d175

File tree

3 files changed

+82
-1
lines changed

3 files changed

+82
-1
lines changed

rosbag2_transport/src/rosbag2_transport/player.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,10 @@ PlayerImpl::PlayerImpl(
463463
keyboard_handler_(std::move(keyboard_handler)),
464464
player_service_client_manager_(std::make_shared<PlayerServiceClientManager>())
465465
{
466+
if (play_options_.read_ahead_queue_size < 1) {
467+
throw std::invalid_argument("read_ahead_queue_size must be at least 1");
468+
}
469+
466470
for (auto & topic : play_options_.topics_to_filter) {
467471
topic = rclcpp::expand_topic_or_service_name(
468472
topic, owner_->get_name(),
@@ -1141,7 +1145,7 @@ void PlayerImpl::load_storage_content()
11411145
while (load_storage_content_ && !stop_playback_ && readers_->has_next()) {
11421146
// The message queue size may get smaller after this, but that's OK
11431147
const size_t message_queue_size = message_queue_.size();
1144-
if (message_queue_size < queue_lower_boundary) {
1148+
if (message_queue_size <= queue_lower_boundary) {
11451149
enqueue_up_to_boundary(queue_upper_boundary, message_queue_size);
11461150
} else {
11471151
std::this_thread::sleep_for(std::chrono::milliseconds(1));

rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ class Rosbag2TransportTestFixture : public Test
7878
return bag_msg;
7979
}
8080

81+
template<typename MessageT>
82+
std::shared_ptr<MessageT> deserialize_test_message(
83+
const std::shared_ptr<rosbag2_storage::SerializedBagMessage> bag_msg)
84+
{
85+
return memory_management_.deserialize_message<MessageT>(bag_msg->serialized_data);
86+
}
87+
8188
static std::string format_message_order(
8289
const TestParamInfo<rosbag2_transport::MessageOrder> & info)
8390
{

rosbag2_transport/test/rosbag2_transport/test_play.cpp

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,76 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics)
403403
ElementsAre(40.0f, 2.0f, 0.0f)))));
404404
}
405405

406+
TEST_F(RosBag2PlayTestFixture, can_play_messages_with_queue_size_equal_one)
407+
{
408+
const auto msg = get_messages_basic_types()[0];
409+
auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
410+
{1u, "topic1", "test_msgs/msg/BasicTypes", "", {}, ""},
411+
{2u, "topic2", "test_msgs/msg/BasicTypes", "", {}, ""},
412+
};
413+
414+
std::vector<std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>> messages_list{};
415+
msg->int32_value = 0;
416+
messages_list.emplace_back(std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>{
417+
(msg->int32_value++, serialize_test_message("topic1", 1, msg)),
418+
(msg->int32_value++, serialize_test_message("topic2", 2, msg)),
419+
(msg->int32_value++, serialize_test_message("topic1", 5, msg)),
420+
(msg->int32_value++, serialize_test_message("topic2", 6, msg)),
421+
(msg->int32_value++, serialize_test_message("topic1", 9, msg)),
422+
(msg->int32_value++, serialize_test_message("topic2", 10, msg))
423+
});
424+
425+
std::vector<rosbag2_transport::Player::reader_storage_options_pair_t> bags{};
426+
std::size_t total_messages = 0u;
427+
for (const auto & curr_bag_messages : messages_list) {
428+
auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
429+
total_messages += curr_bag_messages.size();
430+
prepared_mock_reader->prepare(curr_bag_messages, topic_types);
431+
bags.emplace_back(
432+
std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader)), storage_options_);
433+
}
434+
ASSERT_GT(total_messages, 0u);
435+
// Note: For the sake of the test, the read_ahead_queue_size is set to 1
436+
// to test the corner case of having the smallest possible queue size.
437+
// This means that only one message will be prefetched from the readers at a time.
438+
// This is to make sure that the player can handle this case correctly.
439+
// In a real scenario, a larger queue size is recommended to improve performance.
440+
play_options_.read_ahead_queue_size = 1;
441+
const rosbag2_transport::MessageOrder message_order = play_options_.message_order;
442+
443+
auto player = std::make_shared<rosbag2_transport::Player>(std::move(bags), play_options_);
444+
std::size_t num_played_messages = 0u;
445+
rcutils_time_point_value_t last_timestamp = 0;
446+
const auto get_timestamp =
447+
[message_order](std::shared_ptr<rosbag2_storage::SerializedBagMessage> msg) {
448+
switch (message_order) {
449+
case rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP:
450+
return msg->recv_timestamp;
451+
case rosbag2_transport::MessageOrder::SENT_TIMESTAMP:
452+
return msg->send_timestamp;
453+
default:
454+
throw std::runtime_error("unknown rosbag2_transport::MessageOrder value");
455+
}
456+
};
457+
const auto callback = [&](std::shared_ptr<rosbag2_storage::SerializedBagMessage> playing_msg) {
458+
// Make sure messages are played in order
459+
num_played_messages++;
460+
const auto timestamp = get_timestamp(playing_msg);
461+
using MessageT = typename decltype(msg)::element_type;
462+
const auto deserialized_msg = deserialize_test_message<MessageT>(playing_msg);
463+
// The int32_value was set in an increasing order when creating the messages
464+
EXPECT_EQ(deserialized_msg->int32_value, num_played_messages) << ", timestamp = " <<
465+
RCUTILS_NS_TO_MS(timestamp) << ", topic_name = `" << playing_msg->topic_name << "`\n";
466+
EXPECT_LE(last_timestamp, timestamp);
467+
last_timestamp = timestamp;
468+
};
469+
player->add_on_play_message_pre_callback(callback);
470+
player->play();
471+
ASSERT_TRUE(player->wait_for_playback_to_start(10s));
472+
ASSERT_TRUE(player->wait_for_playback_to_finish(10s));
473+
EXPECT_EQ(total_messages, num_played_messages);
474+
}
475+
406476
TEST_F(RosBag2PlayTestFixture, can_play_when_one_bag_has_fewer_messages_than_other_bags)
407477
{
408478
auto msg = get_messages_basic_types()[0];

0 commit comments

Comments
 (0)