Skip to content

Commit

Permalink
Add play-for functionality (#960)
Browse files Browse the repository at this point in the history
* Add play-for functionality

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Add play-for to the CLI

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Solves most of the comments from https://github.com/ros2/rosbag2/pull/960/files (#14)

* Add play-for functionality

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Add play-for to the CLI

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Adds playback_duration to PlayOptions.

* Changes from PlayFor to Play srv message and changes start_offset and playback_duration.

* Restores play_for tests.

* Removes extra SubscriptionManager methods.

* Solves comment about extra sent message.

* Reorders code and comment.

* Removes the SKIP_TEST flag.

Co-authored-by: Geoffrey Biggs <gbiggs@killbots.net>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Second round of comments from #960 upstream (#15)

* Removes duration parameter. A leftover after switching to playback_duration.

* Fixes comment.

* Solves format in rosbag2_py -> _transport.cpp

* Applies style suggestions.

* Changes play() to return a boolean indicating whether the request could be fulfilled.

* Removes extra unnecessary code.

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Updates test execution time for #960 (#16)

* Adresses reviewer's comments.

* Improve test time by adding an optional argument to SubscriptionManager::spin_subscriptions()

- Reduces test_play_for execution time from 50s to 6s approximately.

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Redesign tests in test_play_for.cpp (#17)

* Redesigned tests to be more deterministic and running faster
* Fixed bug in `play_for()` flow when replaying in loop or multiple
times from the same player instance.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Remove unnecessary source file from test binary

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Correct errors introduced by rebase

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Correct play_next behaviour

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

Co-authored-by: Agustin Alba Chicar <ag.albachicar@ekumenlabs.com>
Co-authored-by: Michael Orlov <morlovmr@gmail.com>
  • Loading branch information
3 people authored Jun 2, 2022
1 parent d97e291 commit ccafe85
Show file tree
Hide file tree
Showing 9 changed files with 409 additions and 6 deletions.
9 changes: 9 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'-d', '--delay', type=positive_float, default=0.0,
help='Sleep duration before play (each loop), in seconds. Negative durations invalid.')
parser.add_argument(
'--playback_duration', type=float, default=-1.0,
help='Playback duration, in seconds. Negative durations mark an infinite playback. '
'Default is -1.0.')
parser.add_argument(
'--disable-keyboard-controls', action='store_true',
help='disables keyboard controls for playback')
Expand All @@ -111,6 +115,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'By default, if loaned message can be used, messages are published as loaned '
'message. It can help to reduce the number of data copies, so there is a greater '
'benefit for sending big data.')
parser.add_argument(
'-f', '--duration', type=float, default=None,
help='Play for SEC seconds. Default is None, meaning that playback will continue '
'until the end of the bag. Valid range > 0.0')

def main(self, *, args): # noqa: D102
qos_profile_overrides = {} # Specify a valid default
Expand Down Expand Up @@ -146,6 +154,7 @@ def main(self, *, args): # noqa: D102
play_options.topic_remapping_options = topic_remapping
play_options.clock_publish_frequency = args.clock
play_options.delay = args.delay
play_options.playback_duration = args.playback_duration
play_options.disable_keyboard_controls = args.disable_keyboard_controls
play_options.start_paused = args.start_paused
play_options.start_offset = args.start_offset
Expand Down
1 change: 1 addition & 0 deletions rosbag2_interfaces/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"srv/GetRate.srv"
"srv/IsPaused.srv"
"srv/Pause.srv"
"srv/Play.srv"
"srv/PlayNext.srv"
"srv/Resume.srv"
"srv/Seek.srv"
Expand Down
7 changes: 7 additions & 0 deletions rosbag2_interfaces/srv/Play.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# See rosbag2_transport::PlayOptions::start_offset
builtin_interfaces/Time start_offset
# See rosbag2_transport::PlayOptions::playback_duration
builtin_interfaces/Duration playback_duration
---
# returns false when another playback execution is running, otherwise true
bool success
17 changes: 16 additions & 1 deletion rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ struct OptionsWrapper : public T
static_cast<rcl_duration_value_t>(RCUTILS_S_TO_NS(delay)));
}

double getPlaybackDuration() const
{
return RCUTILS_NS_TO_S(static_cast<double>(this->playback_duration.nanoseconds()));
}

void setPlaybackDuration(double playback_duration)
{
this->playback_duration = rclcpp::Duration::from_nanoseconds(
static_cast<rcl_duration_value_t>(RCUTILS_S_TO_NS(playback_duration)));
}

double getDelay() const
{
return RCUTILS_NS_TO_S(static_cast<double>(this->delay.nanoseconds()));
Expand Down Expand Up @@ -241,6 +252,10 @@ PYBIND11_MODULE(_transport, m) {
"delay",
&PlayOptions::getDelay,
&PlayOptions::setDelay)
.def_property(
"playback_duration",
&PlayOptions::getPlaybackDuration,
&PlayOptions::setPlaybackDuration)
.def_readwrite("disable_keyboard_controls", &PlayOptions::disable_keyboard_controls)
.def_readwrite("start_paused", &PlayOptions::start_paused)
.def_property(
Expand Down Expand Up @@ -278,7 +293,7 @@ PYBIND11_MODULE(_transport, m) {

py::class_<rosbag2_py::Player>(m, "Player")
.def(py::init())
.def("play", &rosbag2_py::Player::play)
.def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options"))
;

py::class_<rosbag2_py::Recorder>(m, "Recorder")
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ function(create_tests_for_rmw_implementation)
AMENT_DEPS test_msgs rosbag2_test_common
${SKIP_TEST})

rosbag2_transport_add_gmock(test_play_for
test/rosbag2_transport/test_play_for.cpp
INCLUDE_DIRS $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)

rosbag2_transport_add_gmock(test_play_loop
test/rosbag2_transport/test_play_loop.cpp
LINK_LIBS rosbag2_transport
Expand Down
4 changes: 4 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ struct PlayOptions
// Sleep before play. Negative durations invalid. Will delay at the beginning of each loop.
rclcpp::Duration delay = rclcpp::Duration(0, 0);

// Determines the maximum duration of the playback. Negative durations will make the playback to
// not stop. Default configuration makes the player to not stop execution.
rclcpp::Duration playback_duration = rclcpp::Duration(-1, 0);

// Start paused.
bool start_paused = false;

Expand Down
8 changes: 7 additions & 1 deletion rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <functional>
#include <future>
#include <memory>
#include <optional>
#include <queue>
#include <string>
#include <unordered_map>
Expand All @@ -37,6 +38,7 @@
#include "rosbag2_interfaces/srv/get_rate.hpp"
#include "rosbag2_interfaces/srv/is_paused.hpp"
#include "rosbag2_interfaces/srv/pause.hpp"
#include "rosbag2_interfaces/srv/play.hpp"
#include "rosbag2_interfaces/srv/play_next.hpp"
#include "rosbag2_interfaces/srv/burst.hpp"
#include "rosbag2_interfaces/srv/resume.hpp"
Expand Down Expand Up @@ -94,7 +96,7 @@ class Player : public rclcpp::Node
virtual ~Player();

ROSBAG2_TRANSPORT_PUBLIC
void play();
bool play();

// Playback control interface
/// Pause the flow of time for playback.
Expand Down Expand Up @@ -218,14 +220,17 @@ class Player : public rclcpp::Node

rosbag2_storage::StorageOptions storage_options_;
rosbag2_transport::PlayOptions play_options_;
rcutils_time_point_value_t play_until_time_ = -1;
moodycamel::ReaderWriterQueue<rosbag2_storage::SerializedBagMessageSharedPtr> message_queue_;
mutable std::future<void> storage_loading_future_;
std::atomic_bool load_storage_content_{true};
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;
std::mutex skip_message_in_main_play_loop_mutex_;
bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY(
skip_message_in_main_play_loop_mutex_) = false;
std::atomic_bool is_in_playback_{false};

rcutils_time_point_value_t starting_time_;

Expand All @@ -236,6 +241,7 @@ class Player : public rclcpp::Node
rclcpp::Service<rosbag2_interfaces::srv::IsPaused>::SharedPtr srv_is_paused_;
rclcpp::Service<rosbag2_interfaces::srv::GetRate>::SharedPtr srv_get_rate_;
rclcpp::Service<rosbag2_interfaces::srv::SetRate>::SharedPtr srv_set_rate_;
rclcpp::Service<rosbag2_interfaces::srv::Play>::SharedPtr srv_play_;
rclcpp::Service<rosbag2_interfaces::srv::PlayNext>::SharedPtr srv_play_next_;
rclcpp::Service<rosbag2_interfaces::srv::Burst>::SharedPtr srv_burst_;
rclcpp::Service<rosbag2_interfaces::srv::Seek>::SharedPtr srv_seek_;
Expand Down
47 changes: 43 additions & 4 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ Player::Player(
set_rate(play_options_.rate);
topic_qos_profile_overrides_ = play_options_.topic_qos_profile_overrides;
prepare_publishers();

if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) {
play_until_time_ = starting_time_ + play_options_.playback_duration.nanoseconds();
}
}
create_control_services();
add_keyboard_callbacks();
Expand Down Expand Up @@ -191,8 +195,13 @@ bool Player::is_storage_completely_loaded() const
return !storage_loading_future_.valid();
}

void Player::play()
bool Player::play()
{
if (is_in_playback_.exchange(true)) {
RCLCPP_WARN_STREAM(get_logger(), "Trying to play() while in playback, dismissing request.");
return false;
}

rclcpp::Duration delay(0, 0);
if (play_options_.delay >= rclcpp::Duration(0, 0)) {
delay = play_options_.delay;
Expand All @@ -202,21 +211,28 @@ void Player::play()
"Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled.");
}

RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time_);

try {
do {
if (delay > rclcpp::Duration(0, 0)) {
RCLCPP_INFO_STREAM(get_logger(), "Sleep " << delay.nanoseconds() << " ns");
std::chrono::nanoseconds duration(delay.nanoseconds());
std::this_thread::sleep_for(duration);
std::chrono::nanoseconds delay_duration(delay.nanoseconds());
std::this_thread::sleep_for(delay_duration);
}
{
std::lock_guard<std::mutex> lk(reader_mutex_);
reader_->seek(starting_time_);
clock_->jump(starting_time_);
}
load_storage_content_ = true;
storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();});
wait_for_filled_queue();
play_messages_from_queue();

load_storage_content_ = false;
if (storage_loading_future_.valid()) {storage_loading_future_.get();}
while (message_queue_.pop()) {} // cleanup queue
{
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
is_ready_to_play_from_queue_ = false;
Expand All @@ -225,6 +241,9 @@ void Player::play()
} while (rclcpp::ok() && play_options_.loop);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(get_logger(), "Failed to play: %s", e.what());
load_storage_content_ = false;
if (storage_loading_future_.valid()) {storage_loading_future_.get();}
while (message_queue_.pop()) {} // cleanup queue
}
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
is_ready_to_play_from_queue_ = false;
Expand Down Expand Up @@ -254,6 +273,9 @@ void Player::play()
}
}
}

is_in_playback_ = false;
return true;
}

void Player::pause()
Expand Down Expand Up @@ -344,6 +366,9 @@ bool Player::play_next()

bool next_message_published = false;
while (message_ptr != nullptr && !next_message_published) {
if (play_until_time_ >= starting_time_ && message_ptr->time_stamp > play_until_time_) {
break;
}
{
next_message_published = publish_message(message_ptr);
clock_->jump(message_ptr->time_stamp);
Expand Down Expand Up @@ -395,6 +420,7 @@ void Player::seek(rcutils_time_point_value_t time_point)
// Restart queuing thread if it has finished running (previously reached end of bag),
// otherwise, queueing should continue automatically after releasing mutex
if (is_storage_completely_loaded() && rclcpp::ok()) {
load_storage_content_ = true;
storage_loading_future_ =
std::async(std::launch::async, [this]() {load_storage_content();});
}
Expand All @@ -417,7 +443,7 @@ void Player::load_storage_content()
static_cast<size_t>(play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_);
auto queue_upper_boundary = play_options_.read_ahead_queue_size;

while (rclcpp::ok()) {
while (rclcpp::ok() && load_storage_content_) {
TSAUniqueLock lk(reader_mutex_);
if (!reader_->has_next()) {break;}

Expand Down Expand Up @@ -456,6 +482,9 @@ void Player::play_messages_from_queue()
ready_to_play_from_queue_cv_.notify_all();
}
while (message_ptr != nullptr && rclcpp::ok()) {
if (play_until_time_ >= starting_time_ && message_ptr->time_stamp > play_until_time_) {
break;
}
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
while (rclcpp::ok() && !clock_->sleep_until(message_ptr->time_stamp)) {
Expand Down Expand Up @@ -678,6 +707,16 @@ void Player::create_control_services()
{
response->success = set_rate(request->rate);
});
srv_play_ = create_service<rosbag2_interfaces::srv::Play>(
"~/play",
[this](
rosbag2_interfaces::srv::Play::Request::ConstSharedPtr request,
rosbag2_interfaces::srv::Play::Response::SharedPtr response)
{
play_options_.start_offset = rclcpp::Time(request->start_offset).nanoseconds();
play_options_.playback_duration = rclcpp::Duration(request->playback_duration);
response->success = play();
});
srv_play_next_ = create_service<rosbag2_interfaces::srv::PlayNext>(
"~/play_next",
[this](
Expand Down
Loading

0 comments on commit ccafe85

Please sign in to comment.