From 46edcdde6855278face03b03f41262e3e3fb7b74 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sun, 27 Mar 2022 17:38:52 -0700 Subject: [PATCH] Redesign tests in test_play_for.cpp (#17) * Redesigned tests to be more deterministic and running faster * Fixed bug in `play_for()` flow when replaying in loop or multiple times from the same player instance. Signed-off-by: Michael Orlov Signed-off-by: Geoffrey Biggs --- .../subscription_manager.hpp | 13 +- .../include/rosbag2_transport/player.hpp | 1 + .../src/rosbag2_transport/player.cpp | 11 +- .../test/rosbag2_transport/test_play_for.cpp | 237 +++++++++--------- 4 files changed, 136 insertions(+), 126 deletions(-) diff --git a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp index 9fe9bc3a88..f37fa75477 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp @@ -117,15 +117,13 @@ class SubscriptionManager return matched; } - std::future spin_subscriptions(int maximum_time_spinning_sec = 10) + std::future spin_subscriptions() { return async( - std::launch::async, [this, maximum_time_spinning_sec]() { + std::launch::async, [this]() { rclcpp::executors::SingleThreadedExecutor exec; auto start = std::chrono::high_resolution_clock::now(); - while (rclcpp::ok() && - continue_spinning(expected_topics_with_size_, start, maximum_time_spinning_sec)) - { + while (rclcpp::ok() && continue_spinning(expected_topics_with_size_, start)) { exec.spin_node_some(subscriber_node_); } }); @@ -134,11 +132,10 @@ class SubscriptionManager private: bool continue_spinning( const std::unordered_map & expected_topics_with_sizes, - std::chrono::time_point start_time, - int maximum_time_spinning_sec = 10) + std::chrono::time_point start_time) { auto current = std::chrono::high_resolution_clock::now(); - if (current - start_time > std::chrono::seconds(maximum_time_spinning_sec)) { + if (current - start_time > std::chrono::seconds(10)) { return false; } diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index b593ba7cc7..dfded3ca64 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -178,6 +178,7 @@ class Player : public rclcpp::Node rosbag2_transport::PlayOptions play_options_; moodycamel::ReaderWriterQueue message_queue_; mutable std::future storage_loading_future_; + std::atomic_bool load_storage_content_{true}; std::unordered_map topic_qos_profile_overrides_; std::unique_ptr clock_; std::shared_ptr clock_publish_timer_; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index c11ec61604..dcce1c9581 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -225,9 +225,14 @@ bool Player::play() reader_->seek(starting_time_); clock_->jump(starting_time_); } + load_storage_content_ = true; storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); wait_for_filled_queue(); play_messages_from_queue(play_until_time); + + load_storage_content_ = false; + if (storage_loading_future_.valid()) {storage_loading_future_.get();} + while (message_queue_.pop()) {} // cleanup queue { std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; @@ -236,6 +241,9 @@ bool Player::play() } while (rclcpp::ok() && play_options_.loop); } catch (std::runtime_error & e) { RCLCPP_ERROR(get_logger(), "Failed to play: %s", e.what()); + load_storage_content_ = false; + if (storage_loading_future_.valid()) {storage_loading_future_.get();} + while (message_queue_.pop()) {} // cleanup queue } std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; @@ -389,6 +397,7 @@ void Player::seek(rcutils_time_point_value_t time_point) // Restart queuing thread if it has finished running (previously reached end of bag), // otherwise, queueing should continue automatically after releasing mutex if (is_storage_completely_loaded() && rclcpp::ok()) { + load_storage_content_ = true; storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); } @@ -411,7 +420,7 @@ void Player::load_storage_content() static_cast(play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_); auto queue_upper_boundary = play_options_.read_ahead_queue_size; - while (rclcpp::ok()) { + while (rclcpp::ok() && load_storage_content_) { TSAUniqueLock lk(reader_mutex_); if (!reader_->has_next()) {break;} diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp index 75c151754d..66c0346afd 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp @@ -53,8 +53,6 @@ constexpr bool kBool1Value{false}; constexpr bool kBool2Value{true}; constexpr bool kBool3Value{false}; -constexpr int kMaximumTimeSpinningSec{1}; - #define EVAL_REPLAYED_PRIMITIVES(replayed_primitives) \ EXPECT_THAT( \ replayed_primitives, \ @@ -81,15 +79,15 @@ constexpr int kMaximumTimeSpinningSec{1}; class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture { public: - static constexpr const char * kTopic1Name{"topic1"}; - static constexpr const char * kTopic2Name{"topic2"}; - static constexpr const char * kTopic1{"/topic1"}; - static constexpr const char * kTopic2{"/topic2"}; + static constexpr const char * kTopic1Name_{"topic1"}; + static constexpr const char * kTopic2Name_{"topic2"}; + static constexpr const char * kTopic1_{"/topic1"}; + static constexpr const char * kTopic2_{"/topic2"}; std::vector get_topic_types() { - return {{kTopic1Name, "test_msgs/BasicTypes", "", ""}, - {kTopic2Name, "test_msgs/Arrays", "", ""}}; + return {{kTopic1Name_, "test_msgs/BasicTypes", "", ""}, + {kTopic2Name_, "test_msgs/Arrays", "", ""}}; } std::vector> @@ -106,17 +104,20 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture // around without any knowledge about message chronology. It just picks // the next one Make sure to keep the list in order or sort it! std::vector> messages = - {serialize_test_message(kTopic1Name, 100, primitive_message1), - serialize_test_message(kTopic2Name, 120, complex_message1), - serialize_test_message(kTopic1Name, 200, primitive_message1), - serialize_test_message(kTopic2Name, 220, complex_message1), - serialize_test_message(kTopic1Name, 300, primitive_message1), - serialize_test_message(kTopic2Name, 320, complex_message1)}; + {serialize_test_message(kTopic1Name_, 100, primitive_message1), + serialize_test_message(kTopic2Name_, 120, complex_message1), + serialize_test_message(kTopic1Name_, 200, primitive_message1), + serialize_test_message(kTopic2Name_, 220, complex_message1), + serialize_test_message(kTopic1Name_, 300, primitive_message1), + serialize_test_message(kTopic2Name_, 320, complex_message1)}; // @} return messages; } - void InitPlayerWithPlaybackDurationAndPlay(int64_t milliseconds) + void InitPlayerWithPlaybackDurationAndPlay( + int64_t milliseconds, + size_t expected_number_of_messages_on_topic1 = 3, + size_t expected_number_of_messages_on_topic2 = 3) { auto topic_types = get_topic_types(); auto messages = get_serialized_messages(); @@ -125,20 +126,18 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture prepared_mock_reader->prepare(messages, topic_types); auto reader = std::make_unique(std::move(prepared_mock_reader)); - sub_->add_subscription(kTopic1, 3); - sub_->add_subscription(kTopic2, 3); + sub_->add_subscription( + kTopic1_, expected_number_of_messages_on_topic1); + sub_->add_subscription(kTopic2_, expected_number_of_messages_on_topic2); play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(milliseconds)); player_ = std::make_shared(std::move(reader), storage_options_, play_options_); // Wait for discovery to match publishers with subscribers - ASSERT_TRUE( - sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); - - auto await_received_messages = sub_->spin_subscriptions(kMaximumTimeSpinningSec); + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + auto await_received_messages = sub_->spin_subscriptions(); ASSERT_TRUE(player_->play()); - await_received_messages.get(); } @@ -151,12 +150,12 @@ TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration) InitPlayerWithPlaybackDurationAndPlay(350); auto replayed_test_primitives = sub_->get_received_messages( - kTopic1); + kTopic1_); EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(3u))); EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); auto replayed_test_arrays = sub_->get_received_messages( - kTopic2); + kTopic2_); EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u))); EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); @@ -164,82 +163,110 @@ TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration) TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) { - InitPlayerWithPlaybackDurationAndPlay(50); + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; - auto replayed_test_primitives = sub_->get_received_messages( - kTopic1); - EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; - auto replayed_test_arrays = sub_->get_received_messages( - kTopic2); - EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(0u))); -} + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 50, primitive_message1), + serialize_test_message(kTopic1Name_, 100, primitive_message2), + }; -TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration) -{ - InitPlayerWithPlaybackDurationAndPlay(270); + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); - auto replayed_test_primitives = sub_->get_received_messages( - kTopic1); - EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(2u))); - EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + // Expect to receive messages only from play_next in second round + sub_->add_subscription(kTopic1_, messages.size()); + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(49)); - auto replayed_test_arrays = sub_->get_received_messages( - kTopic2); - EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(2u))); - EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); - EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + + // Playing one more time with play_next to save time and count messages + player_->pause(); + auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); + + EXPECT_TRUE(player_->play_next()); + EXPECT_TRUE(player_->play_next()); + EXPECT_FALSE(player_->play_next()); + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(2)); + EXPECT_EQ(replayed_topic1[0]->int32_value, 1); + EXPECT_EQ(replayed_topic1[1]->int32_value, 2); } -class RosBag2PlayForFilteredTopicTestFixture : public RosBag2PlayForTestFixture +TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration) { -public: - void SetUp() override + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; + + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = { - // Filter allows /topic2, blocks /topic1 - play_options_.topics_to_filter = {"topic2"}; - } -}; + serialize_test_message(kTopic1Name_, 10, primitive_message1), + serialize_test_message(kTopic1Name_, 50, primitive_message2), + }; -TEST_F( - RosBag2PlayForFilteredTopicTestFixture, - play_for_full_duration_recorded_messages_with_filtered_topics) -{ - InitPlayerWithPlaybackDurationAndPlay(400); + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); - auto replayed_test_primitives = - sub_->get_received_messages("/topic1"); - // No messages are allowed to have arrived - EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); + // Expect to receive 1 message from play() and 2 messages from play_next in second round + sub_->add_subscription(kTopic1_, messages.size() + 1); + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(49)); - auto replayed_test_arrays = sub_->get_received_messages("/topic2"); - // All messages should have arrived. - EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u))); - EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); - EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); -} + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); -TEST_F( - RosBag2PlayForFilteredTopicTestFixture, - play_for_short_duration_recorded_messages_with_filtered_topics) -{ - InitPlayerWithPlaybackDurationAndPlay(50); + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); - auto replayed_test_primitives = - sub_->get_received_messages("/topic1"); - // No messages are allowed to have arrived - EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); - auto replayed_test_arrays = sub_->get_received_messages("/topic2"); - // All messages should have arrived. - EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(0u))); + // Playing one more time with play_next to save time and count messages + player_->pause(); + auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); + + ASSERT_TRUE(player_->play_next()); + ASSERT_TRUE(player_->play_next()); + ASSERT_FALSE(player_->play_next()); + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(3)); + EXPECT_EQ(replayed_topic1[0]->int32_value, 1); + EXPECT_EQ(replayed_topic1[1]->int32_value, 1); + EXPECT_EQ(replayed_topic1[2]->int32_value, 2); } TEST_F( - RosBag2PlayForFilteredTopicTestFixture, + RosBag2PlayForTestFixture, play_for_intermediate_duration_recorded_messages_with_filtered_topics) { - InitPlayerWithPlaybackDurationAndPlay(270); + // Filter allows /topic2, blocks /topic1 + play_options_.topics_to_filter = {"topic2"}; + InitPlayerWithPlaybackDurationAndPlay(270, 0, 2); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); @@ -253,58 +280,34 @@ TEST_F( EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); } -class RosBag2PlayForInterruptedTestFixture : public RosBag2PlayTestFixture +TEST_F(RosBag2PlayForTestFixture, play_should_return_false_when_interrupted) { -public: - static constexpr const char * kTopic1Name{"topic1"}; - static constexpr const char * kTopic1{"/topic1"}; + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; - std::vector get_topic_types() + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = kIntValue; + std::vector> messages = { - return {{kTopic1Name, "test_msgs/BasicTypes", "", ""}}; - } - - std::vector> - get_serialized_messages() - { - auto primitive_message = get_messages_basic_types()[0]; - primitive_message->int32_value = kIntValue; - - // @{ Ordering matters. The mock reader implementation moves messages - // around without any knowledge about message chronology. It just picks - // the next one Make sure to keep the list in order or sort it! - std::vector> messages = - {serialize_test_message(kTopic1Name, 50, primitive_message), - serialize_test_message(kTopic1Name, 100, primitive_message), - serialize_test_message(kTopic1Name, 150, primitive_message)}; - // @} - return messages; - } -}; - -TEST_F( - RosBag2PlayForInterruptedTestFixture, - play_should_return_false_when_interrupted) -{ - auto topic_types = get_topic_types(); - auto messages = get_serialized_messages(); + serialize_test_message(kTopic1Name_, 50, primitive_message), + serialize_test_message(kTopic1Name_, 100, primitive_message), + }; auto prepared_mock_reader = std::make_unique(); prepared_mock_reader->prepare(messages, topic_types); auto reader = std::make_unique(std::move(prepared_mock_reader)); // Let the player only reproduce one message. - sub_->add_subscription(kTopic1, 1); + sub_->add_subscription(kTopic1_, 1); play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(75)); std::shared_ptr player_ = std::make_shared( std::move(reader), storage_options_, play_options_); // Wait for discovery to match publishers with subscribers - ASSERT_TRUE( - sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); - auto await_received_messages = sub_->spin_subscriptions(kMaximumTimeSpinningSec); + auto await_received_messages = sub_->spin_subscriptions(); player_->pause(); auto player_future = std::async(std::launch::async, [player_]() {return player_->play();}); player_->wait_for_playback_to_start(); @@ -314,6 +317,6 @@ TEST_F( player_->resume(); player_future.get(); await_received_messages.get(); - auto replayed_topic1 = sub_->get_received_messages(kTopic1); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); EXPECT_THAT(replayed_topic1, SizeIs(1)); }