diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 97884e65f8..59633cb14a 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -62,6 +62,9 @@ class Reader; namespace rosbag2_transport { + +class PlayerImpl; + class Player : public rclcpp::Node { public: @@ -200,118 +203,33 @@ class Player : public rclcpp::Node void delete_on_play_message_callback(const callback_handle_t & handle); protected: - struct play_msg_callback_data - { - callback_handle_t handle; - play_msg_callback_t callback; - }; - - std::mutex on_play_msg_callbacks_mutex_; - std::forward_list on_play_msg_pre_callbacks_; - std::forward_list on_play_msg_post_callbacks_; - - class PlayerPublisher final - { -public: - explicit PlayerPublisher( - std::shared_ptr pub, - bool disable_loan_message) - : publisher_(std::move(pub)) - { - using std::placeholders::_1; - if (disable_loan_message || !publisher_->can_loan_messages()) { - publish_func_ = std::bind(&rclcpp::GenericPublisher::publish, publisher_, _1); - } else { - publish_func_ = std::bind(&rclcpp::GenericPublisher::publish_as_loaned_msg, publisher_, _1); - } - } - - ~PlayerPublisher() {} - - void publish(const rclcpp::SerializedMessage & message) - { - publish_func_(message); - } - - std::shared_ptr generic_publisher() - { - return publisher_; - } + /// \brief Getter for publishers corresponding to each topic + /// \return Hashtable representing topic to publisher map excluding inner clock_publisher + ROSBAG2_TRANSPORT_PUBLIC + std::unordered_map> get_publishers(); -private: - std::shared_ptr publisher_; - std::function publish_func_; - }; - bool is_ready_to_play_from_queue_{false}; - std::mutex ready_to_play_from_queue_mutex_; - std::condition_variable ready_to_play_from_queue_cv_; - rclcpp::Publisher::SharedPtr clock_publisher_; - std::unordered_map> publishers_; + /// \brief Getter for inner clock_publisher + /// \return Shared pointer to the inner clock_publisher + ROSBAG2_TRANSPORT_PUBLIC + rclcpp::Publisher::SharedPtr get_clock_publisher(); + + /// \brief Blocks and wait on condition variable until first message will be taken from read + /// queue + ROSBAG2_TRANSPORT_PUBLIC + void wait_for_playback_to_start(); + + /// \brief Getter for the number of registered on_play_msg_pre_callbacks + /// \return Number of registered on_play_msg_pre_callbacks + ROSBAG2_TRANSPORT_PUBLIC + size_t get_number_of_registered_on_play_msg_pre_callbacks(); + + /// \brief Getter for the number of registered on_play_msg_post_callbacks + /// \return Number of registered on_play_msg_post_callbacks + ROSBAG2_TRANSPORT_PUBLIC + size_t get_number_of_registered_on_play_msg_post_callbacks(); private: - rosbag2_storage::SerializedBagMessageSharedPtr peek_next_message_from_queue(); - void load_storage_content(); - bool is_storage_completely_loaded() const; - void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); - void wait_for_filled_queue() const; - void play_messages_from_queue(); - void prepare_publishers(); - bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message); - static callback_handle_t get_new_on_play_msg_callback_handle(); - void add_key_callback( - KeyboardHandler::KeyCode key, - const std::function & cb, - const std::string & op_name); - void add_keyboard_callbacks(); - void create_control_services(); - void configure_play_until_timestamp(); - bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const; - - static constexpr double read_ahead_lower_bound_percentage_ = 0.9; - static const std::chrono::milliseconds queue_read_wait_period_; - std::atomic_bool cancel_wait_for_next_message_{false}; - std::atomic_bool stop_playback_{false}; - - std::mutex reader_mutex_; - std::unique_ptr reader_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_); - - void publish_clock_update(); - void publish_clock_update(const rclcpp::Time & time); - - rosbag2_storage::StorageOptions storage_options_; - rosbag2_transport::PlayOptions play_options_; - rcutils_time_point_value_t play_until_timestamp_ = -1; - moodycamel::ReaderWriterQueue message_queue_; - mutable std::future storage_loading_future_; - std::atomic_bool load_storage_content_{true}; - std::unordered_map topic_qos_profile_overrides_; - std::unique_ptr clock_; - std::shared_ptr clock_publish_timer_; - std::mutex skip_message_in_main_play_loop_mutex_; - bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY( - skip_message_in_main_play_loop_mutex_) = false; - std::atomic_bool is_in_playback_{false}; - - rcutils_time_point_value_t starting_time_; - - // control services - rclcpp::Service::SharedPtr srv_pause_; - rclcpp::Service::SharedPtr srv_resume_; - rclcpp::Service::SharedPtr srv_toggle_paused_; - rclcpp::Service::SharedPtr srv_is_paused_; - rclcpp::Service::SharedPtr srv_get_rate_; - rclcpp::Service::SharedPtr srv_set_rate_; - rclcpp::Service::SharedPtr srv_play_; - rclcpp::Service::SharedPtr srv_play_next_; - rclcpp::Service::SharedPtr srv_burst_; - rclcpp::Service::SharedPtr srv_seek_; - rclcpp::Service::SharedPtr srv_stop_; - - rclcpp::Publisher::SharedPtr split_event_pub_; - - // defaults - std::shared_ptr keyboard_handler_; - std::vector keyboard_callbacks_; + std::unique_ptr pimpl_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index f767cab6ab..98c767e24e 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -89,56 +89,242 @@ namespace rosbag2_transport { constexpr Player::callback_handle_t Player::invalid_callback_handle; -Player::Player(const std::string & node_name, const rclcpp::NodeOptions & node_options) -: rclcpp::Node(node_name, node_options) +class PlayerImpl { - // TODO(karsten1987): Use this constructor later with parameter parsing. - // The reader, storage_options as well as play_options can be loaded via parameter. - // That way, the player can be used as a simple component in a component manager. - throw rclcpp::exceptions::UnimplementedError(); -} +public: + using callback_handle_t = Player::callback_handle_t; + using play_msg_callback_t = Player::play_msg_callback_t; + + PlayerImpl( + Player * owner, + std::unique_ptr reader, + std::shared_ptr keyboard_handler, + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::PlayOptions & play_options); + + ~PlayerImpl(); + + bool play(); + + /// \brief Unpause if in pause mode, stop playback and exit from play. + void stop(); + + // Playback control interface + /// Pause the flow of time for playback. + virtual void pause(); + + /// Start the flow of time for playback. + virtual void resume(); + + /// Pause if time running, resume if paused. + void toggle_paused(); + + /// Return whether the playback is currently paused. + bool is_paused() const; + + /// Return current playback rate. + double get_rate() const; + + /// \brief Set the playback rate. + /// \return false if an invalid value was provided (<= 0). + virtual bool set_rate(double); + + /// \brief Playing next message from queue when in pause. + /// \details This is blocking call and it will wait until next available message will be + /// published or rclcpp context shut down. + /// \note If internal player queue is starving and storage has not been completely loaded, + /// this method will wait until new element will be pushed to the queue. + /// \return true if player in pause mode and successfully played next message, otherwise false. + virtual bool play_next(); + + /// \brief Burst the next \p num_messages messages from the queue when paused. + /// \param num_messages The number of messages to burst from the queue. Specifying zero means no + /// limit (i.e. burst the entire bag). + /// \details This call will play the next \p num_messages from the queue in burst mode. The + /// timing of the messages is ignored. + /// \note If internal player queue is starving and storage has not been completely loaded, + /// this method will wait until new element will be pushed to the queue. + /// \return The number of messages that was played. + virtual size_t burst(const size_t num_messages); + + /// \brief Advance player to the message with closest timestamp >= time_point. + /// \details This is blocking call and it will wait until current message will be published + /// and message queue will be refilled. + /// If time_point is before the beginning of the bag, then playback time will be set to the + /// beginning of the bag. + /// If time_point is after the end of the bag, playback time will be set to the end of the bag, + /// which will then end playback, or if loop is enabled then will start playing at the beginning + /// of the next loop. + /// \param time_point Time point in ROS playback timeline. + void seek(rcutils_time_point_value_t time_point); + + /// \brief Adding callable object as handler for pre-callback on play message. + /// \param callback Callable which will be called before next message will be published. + /// \note In case of registering multiple callbacks later-registered callbacks will be called + /// first. + /// \return Returns newly created callback handle if callback was successfully added, + /// otherwise returns invalid_callback_handle. + callback_handle_t add_on_play_message_pre_callback(const play_msg_callback_t & callback); + + /// \brief Adding callable object as handler for post-callback on play message. + /// \param callback Callable which will be called after next message will be published. + /// \note In case of registering multiple callbacks later-registered callbacks will be called + /// first. + /// \return Returns newly created callback handle if callback was successfully added, + /// otherwise returns invalid_callback_handle. + callback_handle_t add_on_play_message_post_callback(const play_msg_callback_t & callback); + + /// \brief Delete pre or post on play message callback from internal player lists. + /// \param handle Callback's handle returned from #add_on_play_message_pre_callback or + /// #add_on_play_message_post_callback + void delete_on_play_message_callback(const callback_handle_t & handle); + + /// \brief Getter for publishers corresponding to each topic + /// \return Hashtable representing topic to publisher map excluding inner clock_publisher + std::unordered_map> get_publishers(); + + /// \brief Getter for inner clock_publisher + /// \return Shared pointer to the inner clock_publisher + rclcpp::Publisher::SharedPtr get_clock_publisher(); + + /// \brief Blocks and wait on condition variable until first message will be taken from read + /// queue + void wait_for_playback_to_start(); + + /// \brief Getter for the number of registered on_play_msg_pre_callbacks + /// \return Number of registered on_play_msg_pre_callbacks + size_t get_number_of_registered_on_play_msg_pre_callbacks(); + + /// \brief Getter for the number of registered on_play_msg_post_callbacks + /// \return Number of registered on_play_msg_post_callbacks + size_t get_number_of_registered_on_play_msg_post_callbacks(); + +protected: + struct play_msg_callback_data + { + callback_handle_t handle; + play_msg_callback_t callback; + }; -Player::Player( - const rosbag2_storage::StorageOptions & storage_options, - const rosbag2_transport::PlayOptions & play_options, - const std::string & node_name, - const rclcpp::NodeOptions & node_options) -: Player(std::make_unique(), - storage_options, play_options, - node_name, node_options) -{} + std::mutex on_play_msg_callbacks_mutex_; + std::forward_list on_play_msg_pre_callbacks_; + std::forward_list on_play_msg_post_callbacks_; -Player::Player( - std::unique_ptr reader, - const rosbag2_storage::StorageOptions & storage_options, - const rosbag2_transport::PlayOptions & play_options, - const std::string & node_name, - const rclcpp::NodeOptions & node_options) -: Player(std::move(reader), - // only call KeyboardHandler when using default keyboard handler implementation -#ifndef _WIN32 - std::make_shared(false), -#else - // We don't have signal handler option in constructor for windows version - std::shared_ptr(new KeyboardHandler()), -#endif - storage_options, play_options, - node_name, node_options) -{} + class PlayerPublisher final + { +public: + explicit PlayerPublisher( + std::shared_ptr pub, + bool disable_loan_message) + : publisher_(std::move(pub)) + { + using std::placeholders::_1; + if (disable_loan_message || !publisher_->can_loan_messages()) { + publish_func_ = std::bind(&rclcpp::GenericPublisher::publish, publisher_, _1); + } else { + publish_func_ = std::bind(&rclcpp::GenericPublisher::publish_as_loaned_msg, publisher_, _1); + } + } -Player::Player( + ~PlayerPublisher() = default; + + void publish(const rclcpp::SerializedMessage & message) + { + publish_func_(message); + } + + std::shared_ptr generic_publisher() + { + return publisher_; + } + +private: + std::shared_ptr publisher_; + std::function publish_func_; + }; + bool is_ready_to_play_from_queue_{false}; + std::mutex ready_to_play_from_queue_mutex_; + std::condition_variable ready_to_play_from_queue_cv_; + rclcpp::Publisher::SharedPtr clock_publisher_; + std::unordered_map> publishers_; + +private: + rosbag2_storage::SerializedBagMessageSharedPtr peek_next_message_from_queue(); + void load_storage_content(); + bool is_storage_completely_loaded() const; + void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); + void wait_for_filled_queue() const; + void play_messages_from_queue(); + void prepare_publishers(); + bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message); + static callback_handle_t get_new_on_play_msg_callback_handle(); + void add_key_callback( + KeyboardHandler::KeyCode key, + const std::function & cb, + const std::string & op_name); + void add_keyboard_callbacks(); + void create_control_services(); + void configure_play_until_timestamp(); + bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const; + + static constexpr double read_ahead_lower_bound_percentage_ = 0.9; + static const std::chrono::milliseconds queue_read_wait_period_; + std::atomic_bool cancel_wait_for_next_message_{false}; + std::atomic_bool stop_playback_{false}; + + std::mutex reader_mutex_; + std::unique_ptr reader_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_); + + void publish_clock_update(); + void publish_clock_update(const rclcpp::Time & time); + + Player * owner_; + rosbag2_storage::StorageOptions storage_options_; + rosbag2_transport::PlayOptions play_options_; + rcutils_time_point_value_t play_until_timestamp_ = -1; + moodycamel::ReaderWriterQueue message_queue_; + mutable std::future storage_loading_future_; + std::atomic_bool load_storage_content_{true}; + std::unordered_map topic_qos_profile_overrides_; + std::unique_ptr clock_; + std::shared_ptr clock_publish_timer_; + std::mutex skip_message_in_main_play_loop_mutex_; + bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY( + skip_message_in_main_play_loop_mutex_) = false; + std::atomic_bool is_in_playback_{false}; + + rcutils_time_point_value_t starting_time_; + + // control services + rclcpp::Service::SharedPtr srv_pause_; + rclcpp::Service::SharedPtr srv_resume_; + rclcpp::Service::SharedPtr srv_toggle_paused_; + rclcpp::Service::SharedPtr srv_is_paused_; + rclcpp::Service::SharedPtr srv_get_rate_; + rclcpp::Service::SharedPtr srv_set_rate_; + rclcpp::Service::SharedPtr srv_play_; + rclcpp::Service::SharedPtr srv_play_next_; + rclcpp::Service::SharedPtr srv_burst_; + rclcpp::Service::SharedPtr srv_seek_; + rclcpp::Service::SharedPtr srv_stop_; + + rclcpp::Publisher::SharedPtr split_event_pub_; + + // defaults + std::shared_ptr keyboard_handler_; + std::vector keyboard_callbacks_; +}; + +PlayerImpl::PlayerImpl( + Player * owner, std::unique_ptr reader, std::shared_ptr keyboard_handler, const rosbag2_storage::StorageOptions & storage_options, - const rosbag2_transport::PlayOptions & play_options, - const std::string & node_name, - const rclcpp::NodeOptions & node_options) -: rclcpp::Node( - node_name, - rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)), + const rosbag2_transport::PlayOptions & play_options) +: owner_(owner), storage_options_(storage_options), play_options_(play_options), - keyboard_handler_(keyboard_handler) + keyboard_handler_(std::move(keyboard_handler)) { { std::lock_guard lk(reader_mutex_); @@ -152,7 +338,7 @@ Player::Player( // then add the offset to the starting time obtained from reader metadata if (play_options_.start_offset < 0) { RCLCPP_WARN_STREAM( - get_logger(), + owner_->get_logger(), "Invalid start offset value: " << RCUTILS_NS_TO_S(static_cast(play_options_.start_offset)) << ". Negative start offset ignored."); @@ -171,7 +357,7 @@ Player::Player( add_keyboard_callbacks(); } -Player::~Player() +PlayerImpl::~PlayerImpl() { // Force to stop playback to avoid hangout in case of unexpected exception or when smart // pointer to the player object goes out of scope @@ -189,9 +375,9 @@ Player::~Player() } const std::chrono::milliseconds -Player::queue_read_wait_period_ = std::chrono::milliseconds(100); +PlayerImpl::queue_read_wait_period_ = std::chrono::milliseconds(100); -bool Player::is_storage_completely_loaded() const +bool PlayerImpl::is_storage_completely_loaded() const { if (storage_loading_future_.valid() && storage_loading_future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready) @@ -201,10 +387,12 @@ bool Player::is_storage_completely_loaded() const return !storage_loading_future_.valid(); } -bool Player::play() +bool PlayerImpl::play() { if (is_in_playback_.exchange(true)) { - RCLCPP_WARN_STREAM(get_logger(), "Trying to play() while in playback, dismissing request."); + RCLCPP_WARN_STREAM( + owner_->get_logger(), + "Trying to play() while in playback, dismissing request."); return false; } @@ -215,16 +403,16 @@ bool Player::play() delay = play_options_.delay; } else { RCLCPP_WARN_STREAM( - get_logger(), + owner_->get_logger(), "Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled."); } - RCLCPP_INFO_STREAM(get_logger(), "Playback until timestamp: " << play_until_timestamp_); + RCLCPP_INFO_STREAM(owner_->get_logger(), "Playback until timestamp: " << play_until_timestamp_); try { do { if (delay > rclcpp::Duration(0, 0)) { - RCLCPP_INFO_STREAM(get_logger(), "Sleep " << delay.nanoseconds() << " ns"); + RCLCPP_INFO_STREAM(owner_->get_logger(), "Sleep " << delay.nanoseconds() << " ns"); std::chrono::nanoseconds delay_duration(delay.nanoseconds()); std::this_thread::sleep_for(delay_duration); } @@ -248,7 +436,7 @@ bool Player::play() } } while (rclcpp::ok() && !stop_playback_ && play_options_.loop); } catch (std::runtime_error & e) { - RCLCPP_ERROR(get_logger(), "Failed to play: %s", e.what()); + RCLCPP_ERROR(owner_->get_logger(), "Failed to play: %s", e.what()); load_storage_content_ = false; if (storage_loading_future_.valid()) {storage_loading_future_.get();} while (message_queue_.pop()) {} // cleanup queue @@ -267,13 +455,13 @@ bool Player::play() try { if (!pub.second->generic_publisher()->wait_for_all_acked(timeout)) { RCLCPP_ERROR( - get_logger(), + owner_->get_logger(), "Timed out while waiting for all published messages to be acknowledged for topic %s", pub.first.c_str()); } } catch (std::exception & e) { RCLCPP_ERROR( - get_logger(), + owner_->get_logger(), "Exception occurred while waiting for all published messages to be acknowledged for " "topic %s : %s", pub.first.c_str(), @@ -286,12 +474,12 @@ bool Player::play() return true; } -void Player::stop() +void PlayerImpl::stop() { if (!is_in_playback_) { return; } - RCLCPP_INFO_STREAM(get_logger(), "Stopping playback."); + RCLCPP_INFO_STREAM(owner_->get_logger(), "Stopping playback."); stop_playback_ = true; // Temporary stop playback in play_messages_from_queue() and block play_next() and seek() or // wait until those operations will be finished with stop_playback_ = true; @@ -310,53 +498,54 @@ void Player::stop() // Player::play() after finishing play_messages_from_queue(); } -void Player::pause() +void PlayerImpl::pause() { clock_->pause(); - RCLCPP_INFO_STREAM(get_logger(), "Pausing play."); + RCLCPP_INFO_STREAM(owner_->get_logger(), "Pausing play."); } -void Player::resume() +void PlayerImpl::resume() { clock_->resume(); - RCLCPP_INFO_STREAM(get_logger(), "Resuming play."); + RCLCPP_INFO_STREAM(owner_->get_logger(), "Resuming play."); } -void Player::toggle_paused() +void PlayerImpl::toggle_paused() { - is_paused() ? resume() : pause(); + // Note: Use upper level public API from owner class to facilitate unit tests + owner_->is_paused() ? owner_->resume() : owner_->pause(); } -bool Player::is_paused() const +bool PlayerImpl::is_paused() const { return clock_->is_paused(); } -double Player::get_rate() const +double PlayerImpl::get_rate() const { return clock_->get_rate(); } -bool Player::set_rate(double rate) +bool PlayerImpl::set_rate(double rate) { bool ok = clock_->set_rate(rate); if (ok) { - RCLCPP_INFO_STREAM(get_logger(), "Set rate to " << rate); + RCLCPP_INFO_STREAM(owner_->get_logger(), "Set rate to " << rate); } else { - RCLCPP_WARN_STREAM(get_logger(), "Failed to set rate to invalid value " << rate); + RCLCPP_WARN_STREAM(owner_->get_logger(), "Failed to set rate to invalid value " << rate); } return ok; } -rosbag2_storage::SerializedBagMessageSharedPtr Player::peek_next_message_from_queue() +rosbag2_storage::SerializedBagMessageSharedPtr PlayerImpl::peek_next_message_from_queue() { rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr_ptr = message_queue_.peek(); while (!stop_playback_ && message_ptr_ptr == nullptr && !is_storage_completely_loaded() && rclcpp::ok()) { RCLCPP_WARN_THROTTLE( - get_logger(), - *get_clock(), + owner_->get_logger(), + *owner_->get_clock(), 1000, "Message queue starved. Messages will be delayed. Consider " "increasing the --read-ahead-queue-size option."); @@ -377,21 +566,21 @@ rosbag2_storage::SerializedBagMessageSharedPtr Player::peek_next_message_from_qu return nullptr; } -bool Player::play_next() +bool PlayerImpl::play_next() { if (!clock_->is_paused()) { - RCLCPP_WARN_STREAM(get_logger(), "Called play next, but not in paused state."); + RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state."); return false; } - RCLCPP_INFO_STREAM(get_logger(), "Playing next message."); + RCLCPP_INFO_STREAM(owner_->get_logger(), "Playing next message."); // Temporary take over playback from play_messages_from_queue() std::lock_guard main_play_loop_lk(skip_message_in_main_play_loop_mutex_); // Check one more time that we are in pause mode after waiting on mutex. Someone could call // resume() or stop() from another thread while we were waiting on mutex. if (!clock_->is_paused()) { - RCLCPP_WARN_STREAM(get_logger(), "Called play next, but not in paused state."); + RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state."); return false; } skip_message_in_main_play_loop_ = true; @@ -417,10 +606,10 @@ bool Player::play_next() return next_message_published; } -size_t Player::burst(const size_t num_messages) +size_t PlayerImpl::burst(const size_t num_messages) { if (!clock_->is_paused()) { - RCLCPP_WARN_STREAM(get_logger(), "Burst can only be used when in the paused state."); + RCLCPP_WARN_STREAM(owner_->get_logger(), "Burst can only be used when in the paused state."); return 0; } @@ -437,7 +626,7 @@ size_t Player::burst(const size_t num_messages) return messages_played; } -void Player::seek(rcutils_time_point_value_t time_point) +void PlayerImpl::seek(rcutils_time_point_value_t time_point) { // Temporary stop playback in play_messages_from_queue() and block play_next() std::lock_guard main_play_loop_lk(skip_message_in_main_play_loop_mutex_); @@ -470,11 +659,11 @@ void Player::seek(rcutils_time_point_value_t time_point) } } -Player::callback_handle_t Player::add_on_play_message_pre_callback( +Player::callback_handle_t PlayerImpl::add_on_play_message_pre_callback( const play_msg_callback_t & callback) { if (callback == nullptr) { - return invalid_callback_handle; + return Player::invalid_callback_handle; } std::lock_guard lk(on_play_msg_callbacks_mutex_); callback_handle_t new_handle = get_new_on_play_msg_callback_handle(); @@ -482,11 +671,11 @@ Player::callback_handle_t Player::add_on_play_message_pre_callback( return new_handle; } -Player::callback_handle_t Player::add_on_play_message_post_callback( +Player::callback_handle_t PlayerImpl::add_on_play_message_post_callback( const play_msg_callback_t & callback) { if (callback == nullptr) { - return invalid_callback_handle; + return Player::invalid_callback_handle; } std::lock_guard lk(on_play_msg_callbacks_mutex_); callback_handle_t new_handle = get_new_on_play_msg_callback_handle(); @@ -494,7 +683,7 @@ Player::callback_handle_t Player::add_on_play_message_post_callback( return new_handle; } -void Player::delete_on_play_message_callback(const callback_handle_t & handle) +void PlayerImpl::delete_on_play_message_callback(const callback_handle_t & handle) { std::lock_guard lk(on_play_msg_callbacks_mutex_); on_play_msg_pre_callbacks_.remove_if( @@ -507,13 +696,56 @@ void Player::delete_on_play_message_callback(const callback_handle_t & handle) }); } -Player::callback_handle_t Player::get_new_on_play_msg_callback_handle() +std::unordered_map> PlayerImpl::get_publishers() +{ + std::unordered_map> topic_to_publisher_map; + for (const auto & [topic, publisher] : publishers_) { + topic_to_publisher_map[topic] = publisher->generic_publisher(); + } + return topic_to_publisher_map; +} + +rclcpp::Publisher::SharedPtr PlayerImpl::get_clock_publisher() +{ + return clock_publisher_; +} + +void PlayerImpl::wait_for_playback_to_start() +{ + std::unique_lock lk(ready_to_play_from_queue_mutex_); + ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;}); +} + +size_t PlayerImpl::get_number_of_registered_on_play_msg_pre_callbacks() +{ + size_t callback_counter = 0; + std::lock_guard lk(on_play_msg_callbacks_mutex_); + for (auto & pre_callback_data : on_play_msg_pre_callbacks_) { + (void)pre_callback_data; + callback_counter++; + } + return callback_counter; +} + +size_t PlayerImpl::get_number_of_registered_on_play_msg_post_callbacks() +{ + size_t callback_counter = 0; + std::lock_guard lk(on_play_msg_callbacks_mutex_); + for (auto & post_callback_data : on_play_msg_post_callbacks_) { + (void)post_callback_data; + callback_counter++; + } + return callback_counter; +} + +Player::callback_handle_t PlayerImpl::get_new_on_play_msg_callback_handle() { static std::atomic handle_count{0}; return handle_count.fetch_add(1, std::memory_order_relaxed) + 1; } -void Player::wait_for_filled_queue() const +void PlayerImpl::wait_for_filled_queue() const { while ( message_queue_.size_approx() < play_options_.read_ahead_queue_size && @@ -523,7 +755,7 @@ void Player::wait_for_filled_queue() const } } -void Player::load_storage_content() +void PlayerImpl::load_storage_content() { auto queue_lower_boundary = static_cast(play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_); @@ -542,7 +774,7 @@ void Player::load_storage_content() } } -void Player::enqueue_up_to_boundary(size_t boundary) +void PlayerImpl::enqueue_up_to_boundary(size_t boundary) { rosbag2_storage::SerializedBagMessageSharedPtr message; for (size_t i = message_queue_.size_approx(); i < boundary; i++) { @@ -554,7 +786,7 @@ void Player::enqueue_up_to_boundary(size_t boundary) } } -void Player::play_messages_from_queue() +void PlayerImpl::play_messages_from_queue() { // Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message) // to support play_next() API logic. @@ -597,7 +829,7 @@ void Player::play_messages_from_queue() } } -void Player::prepare_publishers() +void PlayerImpl::prepare_publishers() { rosbag2_storage::StorageFilter storage_filter; storage_filter.topics = play_options_.topics_to_filter; @@ -609,7 +841,7 @@ void Player::prepare_publishers() if (play_options_.clock_publish_frequency > 0.f || play_options_.clock_publish_on_topic_publish) { // NOTE: PlayerClock does not own this publisher because rosbag2_cpp // should not own transport-based functionality - clock_publisher_ = create_publisher( + clock_publisher_ = owner_->create_publisher( "/clock", rclcpp::ClockQoS()); } @@ -617,7 +849,7 @@ void Player::prepare_publishers() const auto publish_period = std::chrono::nanoseconds( static_cast(RCUTILS_S_TO_NS(1) / play_options_.clock_publish_frequency)); - clock_publish_timer_ = create_wall_timer( + clock_publish_timer_ = owner_->create_wall_timer( publish_period, [this]() { publish_clock_update(); }); @@ -657,12 +889,12 @@ void Player::prepare_publishers() auto topic_qos = publisher_qos_for_topic( topic, topic_qos_profile_overrides_, - get_logger()); + owner_->get_logger()); try { std::shared_ptr pub = - create_generic_publisher(topic.name, topic.type, topic_qos); - std::shared_ptr player_pub = - std::make_shared( + owner_->create_generic_publisher(topic.name, topic.type, topic_qos); + std::shared_ptr player_pub = + std::make_shared( std::move(pub), play_options_.disable_loan_message); publishers_.insert(std::make_pair(topic.name, player_pub)); if (play_options_.wait_acked_timeout >= 0 && @@ -674,7 +906,7 @@ void Player::prepare_publishers() // using a warning log seems better than adding a new option // to ignore some unknown message type library RCLCPP_WARN( - get_logger(), + owner_->get_logger(), "Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what()); } } @@ -686,13 +918,13 @@ void Player::prepare_publishers() topic_without_support_acked.end()); RCLCPP_WARN( - get_logger(), + owner_->get_logger(), "--wait-for-all-acked is invalid for the below topics since reliability of QOS is " "BestEffort.\n%s", topic_without_support_acked.c_str()); } // Create a publisher and callback for when encountering a split in the input - split_event_pub_ = create_publisher( + split_event_pub_ = owner_->create_publisher( "events/read_split", 1); rosbag2_cpp::bag_events::ReaderEventCallbacks callbacks; @@ -706,7 +938,7 @@ void Player::prepare_publishers() reader_->add_event_callbacks(callbacks); } -bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message) +bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message) { bool message_published = false; auto publisher_iter = publishers_.find(message->topic_name); @@ -725,7 +957,7 @@ bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr mess message_published = true; } catch (const std::exception & e) { RCLCPP_ERROR_STREAM( - get_logger(), "Failed to publish message on '" << message->topic_name << + owner_->get_logger(), "Failed to publish message on '" << message->topic_name << "' topic. \nError: " << e.what()); } @@ -740,7 +972,7 @@ bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr mess return message_published; } -void Player::add_key_callback( +void PlayerImpl::add_key_callback( KeyboardHandler::KeyCode key, const std::function & cb, const std::string & op_name) @@ -748,7 +980,7 @@ void Player::add_key_callback( std::string key_str = enum_key_code_to_str(key); if (key == KeyboardHandler::KeyCode::UNKNOWN) { RCLCPP_ERROR_STREAM( - get_logger(), + owner_->get_logger(), "Invalid key binding " << key_str << " for " << op_name); throw std::invalid_argument("Invalid key binding."); } @@ -759,91 +991,93 @@ void Player::add_key_callback( key)); // show instructions RCLCPP_INFO_STREAM( - get_logger(), + owner_->get_logger(), "Press " << key_str << " for " << op_name); } -void Player::add_keyboard_callbacks() +void PlayerImpl::add_keyboard_callbacks() { // skip if disabled if (play_options_.disable_keyboard_controls) { return; } - RCLCPP_INFO_STREAM(get_logger(), "Adding keyboard callbacks."); - // check keybindings + RCLCPP_INFO_STREAM(owner_->get_logger(), "Adding keyboard callbacks."); + // Add keybindings + // Note: Use upper level public API from owner class for callbacks to facilitate unit tests add_key_callback( play_options_.pause_resume_toggle_key, - [this]() {toggle_paused();}, + [this]() {owner_->toggle_paused();}, "Pause/Resume" ); add_key_callback( play_options_.play_next_key, - [this]() {play_next();}, + [this]() {owner_->play_next();}, "Play Next Message" ); add_key_callback( play_options_.increase_rate_key, - [this]() {set_rate(get_rate() * 1.1);}, + [this]() {owner_->set_rate(get_rate() * 1.1);}, "Increase Rate 10%" ); add_key_callback( play_options_.decrease_rate_key, - [this]() {set_rate(get_rate() * 0.9);}, + [this]() {owner_->set_rate(get_rate() * 0.9);}, "Decrease Rate 10%" ); } -void Player::create_control_services() +void PlayerImpl::create_control_services() { - srv_pause_ = create_service( + // Note: Use upper level public API from owner class for callbacks to facilitate unit tests + srv_pause_ = owner_->create_service( "~/pause", [this]( rosbag2_interfaces::srv::Pause::Request::ConstSharedPtr, rosbag2_interfaces::srv::Pause::Response::SharedPtr) { - pause(); + owner_->pause(); }); - srv_resume_ = create_service( + srv_resume_ = owner_->create_service( "~/resume", [this]( rosbag2_interfaces::srv::Resume::Request::ConstSharedPtr, rosbag2_interfaces::srv::Resume::Response::SharedPtr) { - resume(); + owner_->resume(); }); - srv_toggle_paused_ = create_service( + srv_toggle_paused_ = owner_->create_service( "~/toggle_paused", [this]( rosbag2_interfaces::srv::TogglePaused::Request::ConstSharedPtr, rosbag2_interfaces::srv::TogglePaused::Response::SharedPtr) { - toggle_paused(); + owner_->toggle_paused(); }); - srv_is_paused_ = create_service( + srv_is_paused_ = owner_->create_service( "~/is_paused", [this]( rosbag2_interfaces::srv::IsPaused::Request::ConstSharedPtr, rosbag2_interfaces::srv::IsPaused::Response::SharedPtr response) { - response->paused = is_paused(); + response->paused = owner_->is_paused(); }); - srv_get_rate_ = create_service( + srv_get_rate_ = owner_->create_service( "~/get_rate", [this]( rosbag2_interfaces::srv::GetRate::Request::ConstSharedPtr, rosbag2_interfaces::srv::GetRate::Response::SharedPtr response) { - response->rate = get_rate(); + response->rate = owner_->get_rate(); }); - srv_set_rate_ = create_service( + srv_set_rate_ = owner_->create_service( "~/set_rate", [this]( rosbag2_interfaces::srv::SetRate::Request::ConstSharedPtr request, rosbag2_interfaces::srv::SetRate::Response::SharedPtr response) { - response->success = set_rate(request->rate); + response->success = owner_->set_rate(request->rate); }); - srv_play_ = create_service( + srv_play_ = owner_->create_service( "~/play", [this]( rosbag2_interfaces::srv::Play::Request::ConstSharedPtr request, @@ -854,44 +1088,44 @@ void Player::create_control_services() play_options_.playback_until_timestamp = rclcpp::Time(request->playback_until_timestamp).nanoseconds(); configure_play_until_timestamp(); - response->success = play(); + response->success = owner_->play(); }); - srv_play_next_ = create_service( + srv_play_next_ = owner_->create_service( "~/play_next", [this]( rosbag2_interfaces::srv::PlayNext::Request::ConstSharedPtr, rosbag2_interfaces::srv::PlayNext::Response::SharedPtr response) { - response->success = play_next(); + response->success = owner_->play_next(); }); - srv_burst_ = create_service( + srv_burst_ = owner_->create_service( "~/burst", [this]( rosbag2_interfaces::srv::Burst::Request::ConstSharedPtr request, rosbag2_interfaces::srv::Burst::Response::SharedPtr response) { - response->actually_burst = burst(request->num_messages); + response->actually_burst = owner_->burst(request->num_messages); }); - srv_seek_ = create_service( + srv_seek_ = owner_->create_service( "~/seek", [this]( rosbag2_interfaces::srv::Seek::Request::ConstSharedPtr request, rosbag2_interfaces::srv::Seek::Response::SharedPtr response) { - seek(rclcpp::Time(request->time).nanoseconds()); + owner_->seek(rclcpp::Time(request->time).nanoseconds()); response->success = true; }); - srv_stop_ = create_service( + srv_stop_ = owner_->create_service( "~/stop", [this]( rosbag2_interfaces::srv::Stop::Request::ConstSharedPtr, rosbag2_interfaces::srv::Stop::Response::SharedPtr) { - stop(); + owner_->stop(); }); } -void Player::configure_play_until_timestamp() +void PlayerImpl::configure_play_until_timestamp() { if (play_options_.playback_duration >= rclcpp::Duration(0, 0) || play_options_.playback_until_timestamp >= rcutils_time_point_value_t{0}) @@ -907,7 +1141,8 @@ void Player::configure_play_until_timestamp() } } -inline bool Player::shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const +inline bool PlayerImpl::shall_stop_at_timestamp( + const rcutils_time_point_value_t & msg_timestamp) const { if ((play_until_timestamp_ > -1 && msg_timestamp > play_until_timestamp_) || play_until_timestamp_ == 0) @@ -918,12 +1153,12 @@ inline bool Player::shall_stop_at_timestamp(const rcutils_time_point_value_t & m } } -void Player::publish_clock_update() +void PlayerImpl::publish_clock_update() { publish_clock_update(rclcpp::Time(clock_->now())); } -void Player::publish_clock_update(const rclcpp::Time & time) +void PlayerImpl::publish_clock_update(const rclcpp::Time & time) { if (clock_publisher_->can_loan_messages()) { auto loaned_timestamp{clock_publisher_->borrow_loaned_message()}; @@ -936,4 +1171,157 @@ void Player::publish_clock_update(const rclcpp::Time & time) } } +/////////////////////////////// +// Player public interface + +Player::Player(const std::string & node_name, const rclcpp::NodeOptions & node_options) +: rclcpp::Node(node_name, node_options) +{ + // TODO(karsten1987): Use this constructor later with parameter parsing. + // The reader, storage_options as well as play_options can be loaded via parameter. + // That way, the player can be used as a simple component in a component manager. + throw rclcpp::exceptions::UnimplementedError(); +} + +Player::Player( + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) +: Player(std::make_unique(), + storage_options, play_options, node_name, node_options) +{} + +Player::Player( + std::unique_ptr reader, + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) +: Player(std::move(reader), + // only call KeyboardHandler when using default keyboard handler implementation +#ifndef _WIN32 + std::make_shared(false), +#else + // We don't have signal handler option in constructor for windows version + std::shared_ptr(new KeyboardHandler()), +#endif + storage_options, play_options, node_name, node_options) +{} + +Player::Player( + std::unique_ptr reader, + std::shared_ptr keyboard_handler, + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) +: rclcpp::Node( + node_name, + rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)), + pimpl_(std::make_unique( + this, std::move(reader), keyboard_handler, + storage_options, play_options)) +{} + +Player::~Player() = default; + + +bool Player::play() +{ + return pimpl_->play(); +} + +void Player::stop() +{ + pimpl_->stop(); +} + +void Player::pause() +{ + pimpl_->pause(); +} + +void Player::resume() +{ + pimpl_->resume(); +} + +void Player::toggle_paused() +{ + pimpl_->toggle_paused(); +} + +bool Player::is_paused() const +{ + return pimpl_->is_paused(); +} + +double Player::get_rate() const +{ + return pimpl_->get_rate(); +} + +bool Player::set_rate(double rate) +{ + return pimpl_->set_rate(rate); +} + +bool Player::play_next() +{ + return pimpl_->play_next(); +} + +size_t Player::burst(const size_t num_messages) +{ + return pimpl_->burst(num_messages); +} + +void Player::seek(rcutils_time_point_value_t time_point) +{ + pimpl_->seek(time_point); +} + +Player::callback_handle_t Player::add_on_play_message_pre_callback( + const play_msg_callback_t & callback) +{ + return pimpl_->add_on_play_message_pre_callback(callback); +} + +Player::callback_handle_t Player::add_on_play_message_post_callback( + const play_msg_callback_t & callback) +{ + return pimpl_->add_on_play_message_post_callback(callback); +} + +void Player::delete_on_play_message_callback(const Player::callback_handle_t & handle) +{ + pimpl_->delete_on_play_message_callback(handle); +} + +std::unordered_map> Player::get_publishers() +{ + return pimpl_->get_publishers(); +} + +rclcpp::Publisher::SharedPtr Player::get_clock_publisher() +{ + return pimpl_->get_clock_publisher(); +} + +void Player::wait_for_playback_to_start() +{ + pimpl_->wait_for_playback_to_start(); +} + +size_t Player::get_number_of_registered_on_play_msg_pre_callbacks() +{ + return pimpl_->get_number_of_registered_on_play_msg_pre_callbacks(); +} + +size_t Player::get_number_of_registered_on_play_msg_post_callbacks() +{ + return pimpl_->get_number_of_registered_on_play_msg_post_callbacks(); +} + } // namespace rosbag2_transport diff --git a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp index 144bb6f522..7a0143a1c9 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp @@ -36,43 +36,28 @@ class MockPlayer : public rosbag2_transport::Player std::vector get_list_of_publishers() { std::vector pub_list; - for (const auto & publisher : publishers_) { + for (const auto & publisher : get_publishers()) { pub_list.push_back( static_cast( - publisher.second->generic_publisher().get())); + publisher.second.get())); } - if (clock_publisher_) { - pub_list.push_back(clock_publisher_.get()); + auto clock_pub = get_clock_publisher(); + if (clock_pub) { + pub_list.push_back(clock_pub.get()); } return pub_list; } - void wait_for_playback_to_start() - { - std::unique_lock lk(ready_to_play_from_queue_mutex_); - ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;}); - } + using rosbag2_transport::Player::wait_for_playback_to_start; size_t get_number_of_registered_pre_callbacks() { - size_t callback_counter = 0; - std::lock_guard lk(on_play_msg_callbacks_mutex_); - for (auto & pre_callback_data : on_play_msg_pre_callbacks_) { - (void)pre_callback_data; - callback_counter++; - } - return callback_counter; + return get_number_of_registered_on_play_msg_pre_callbacks(); } size_t get_number_of_registered_post_callbacks() { - size_t callback_counter = 0; - std::lock_guard lk(on_play_msg_callbacks_mutex_); - for (auto & post_callback_data : on_play_msg_post_callbacks_) { - (void)post_callback_data; - callback_counter++; - } - return callback_counter; + return get_number_of_registered_on_play_msg_post_callbacks(); } };