Skip to content

Commit

Permalink
Redesign tests in test_play_for.cpp (#17)
Browse files Browse the repository at this point in the history
* Redesigned tests to be more deterministic and running faster
* Fixed bug in `play_for()` flow when replaying in loop or multiple
times from the same player instance.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>
  • Loading branch information
MichaelOrlov authored and gbiggs committed Mar 30, 2022
1 parent 4e01e98 commit 46edcdd
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,13 @@ class SubscriptionManager
return matched;
}

std::future<void> spin_subscriptions(int maximum_time_spinning_sec = 10)
std::future<void> spin_subscriptions()
{
return async(
std::launch::async, [this, maximum_time_spinning_sec]() {
std::launch::async, [this]() {
rclcpp::executors::SingleThreadedExecutor exec;
auto start = std::chrono::high_resolution_clock::now();
while (rclcpp::ok() &&
continue_spinning(expected_topics_with_size_, start, maximum_time_spinning_sec))
{
while (rclcpp::ok() && continue_spinning(expected_topics_with_size_, start)) {
exec.spin_node_some(subscriber_node_);
}
});
Expand All @@ -134,11 +132,10 @@ class SubscriptionManager
private:
bool continue_spinning(
const std::unordered_map<std::string, size_t> & expected_topics_with_sizes,
std::chrono::time_point<std::chrono::high_resolution_clock> start_time,
int maximum_time_spinning_sec = 10)
std::chrono::time_point<std::chrono::high_resolution_clock> start_time)
{
auto current = std::chrono::high_resolution_clock::now();
if (current - start_time > std::chrono::seconds(maximum_time_spinning_sec)) {
if (current - start_time > std::chrono::seconds(10)) {
return false;
}

Expand Down
1 change: 1 addition & 0 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class Player : public rclcpp::Node
rosbag2_transport::PlayOptions play_options_;
moodycamel::ReaderWriterQueue<rosbag2_storage::SerializedBagMessageSharedPtr> message_queue_;
mutable std::future<void> storage_loading_future_;
std::atomic_bool load_storage_content_{true};
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;
Expand Down
11 changes: 10 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,14 @@ bool Player::play()
reader_->seek(starting_time_);
clock_->jump(starting_time_);
}
load_storage_content_ = true;
storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();});
wait_for_filled_queue();
play_messages_from_queue(play_until_time);

load_storage_content_ = false;
if (storage_loading_future_.valid()) {storage_loading_future_.get();}
while (message_queue_.pop()) {} // cleanup queue
{
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
is_ready_to_play_from_queue_ = false;
Expand All @@ -236,6 +241,9 @@ bool Player::play()
} while (rclcpp::ok() && play_options_.loop);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(get_logger(), "Failed to play: %s", e.what());
load_storage_content_ = false;
if (storage_loading_future_.valid()) {storage_loading_future_.get();}
while (message_queue_.pop()) {} // cleanup queue
}
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
is_ready_to_play_from_queue_ = false;
Expand Down Expand Up @@ -389,6 +397,7 @@ void Player::seek(rcutils_time_point_value_t time_point)
// Restart queuing thread if it has finished running (previously reached end of bag),
// otherwise, queueing should continue automatically after releasing mutex
if (is_storage_completely_loaded() && rclcpp::ok()) {
load_storage_content_ = true;
storage_loading_future_ =
std::async(std::launch::async, [this]() {load_storage_content();});
}
Expand All @@ -411,7 +420,7 @@ void Player::load_storage_content()
static_cast<size_t>(play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_);
auto queue_upper_boundary = play_options_.read_ahead_queue_size;

while (rclcpp::ok()) {
while (rclcpp::ok() && load_storage_content_) {
TSAUniqueLock lk(reader_mutex_);
if (!reader_->has_next()) {break;}

Expand Down
Loading

0 comments on commit 46edcdd

Please sign in to comment.