From bc822f134ef6dfc90628111ca5445fd5243471c4 Mon Sep 17 00:00:00 2001 From: Martin Idel Date: Thu, 20 Sep 2018 03:41:08 +0200 Subject: [PATCH] Add correct timing behaviour for rosbag play (#32) * GH-69 Read storage content in a separate thread For now the publishing starts only after the reading is completly done. This should change aufter GH-68 is done and a thread-safe queue can be used instead of std::queue. * GH-71 Add integration test for timing behavior * GH-68 Introduce vendor package for shared queue - Download and install headers from moodycamel readerwriterqueue - Download and install headers from moodycamel concurrentqueue - Use readerwriterqueue in code to load and publish concurrently * GH-71 Retain time difference of messages when playing a bag file - The main (play) thread sleeps until the time for publishing the message is reached. - Using std::chrono time_point and duration for type-safe time arithmetic instead of rcutils time types. * GH-71 Improve stability of read test - Subscribers need to maintain a longer history if the messages are not consumed fast enough. * GH-71 Fix Classloader instance lifetime The Classloader instance needs to outlive all objects created by it. * GH-71 Extract playing code into a class of its own Reason: record and play have almost no common code but do the exact opposite with the storage and rclcpp. * GH-70 Do not link explicitly against std_msgs - only required in tests - this decreases the amount of packages needed for a clean build without tests * GH-70 Fix error message of storage * GH-70 Fix pluginlib/storage issue for recording * GH-71 Cleanup: variable naming * GH-70 Load storage continuously instead of as fast as possible - Only load if queue contains less than 1000 messages - Wait a millisecond before loading again once the queue is long enough * GH-70 Add options struct to allow specification of queue size * GH-72 Wait for messages to fill up * GH-74 Rename integration tests to play/record tests * GH-74 Use test_msgs in integration tests - gets rid of string_msgs dependency * GH-70 Rename is_not_ready to is_pending, use bulk reading to queue * GH-70 Harmonize storage_loading_future variable * GH-88 Read messages in order of their timestamps - Currently, we write sequentially in order of arrival time so reading in id order is fine - This may change at a later time and should not change the reading behaviour, i.e. we need to read in order of timestamps * Fix compiler error on Mac * GH-8 Fix: use correct ros message type in test * GH-8 Cleanup: minor code style fixes * GH-8 Refactor future usage in player Make the future a class member of player to avoid having to hand it into several functions which is difficult with a move-only type. * GH-8 Cleanup: remove verbose logging for every stored message * GH-8 Refactor rosbag2 interface Add an explicit overload for record without a topic_names argument to record all topics. * fix: call vector.reserve instead of default initalization * fix record demo --- rosbag2/CMakeLists.txt | 56 ++++--- rosbag2/include/rosbag2/rosbag2.hpp | 27 ++-- .../include/rosbag2/rosbag2_play_options.hpp | 30 ++++ rosbag2/package.xml | 7 +- rosbag2/src/rosbag2/demo_play.cpp | 7 +- rosbag2/src/rosbag2/demo_record.cpp | 6 +- rosbag2/src/rosbag2/player.cpp | 138 ++++++++++++++++++ rosbag2/src/rosbag2/player.hpp | 64 ++++++++ rosbag2/src/rosbag2/replayable_message.hpp | 34 +++++ rosbag2/src/rosbag2/rosbag2.cpp | 78 +++++----- rosbag2/src/rosbag2/typesupport_helpers.hpp | 2 +- ....cpp => rosbag2_play_integration_test.cpp} | 22 ++- .../rosbag2_play_timing_integration_test.cpp | 63 ++++++++ .../rosbag2_record_all_integration_test.cpp | 65 +++++++++ ...=> rosbag2_record_integration_fixture.hpp} | 16 +- .../rosbag2_record_integration_test.cpp | 65 +++++++++ .../test/rosbag2/rosbag2_rosbag_node_test.cpp | 34 ++--- rosbag2/test/rosbag2/rosbag2_test_fixture.hpp | 27 +++- .../rosbag2_typesupport_helpers_test.cpp | 2 +- .../rosbag2_write_all_integration_test.cpp | 56 ------- .../rosbag2_write_integration_test.cpp | 56 ------- .../src/impl/storage_factory_impl.hpp | 12 +- .../sqlite/sqlite_storage.cpp | 4 +- .../sqlite_storage_integration_test.cpp | 18 +++ shared_queues_vendor/CMakeLists.txt | 43 ++++++ shared_queues_vendor/package.xml | 15 ++ 26 files changed, 701 insertions(+), 246 deletions(-) create mode 100644 rosbag2/include/rosbag2/rosbag2_play_options.hpp create mode 100644 rosbag2/src/rosbag2/player.cpp create mode 100644 rosbag2/src/rosbag2/player.hpp create mode 100644 rosbag2/src/rosbag2/replayable_message.hpp rename rosbag2/test/rosbag2/{rosbag2_read_integration_test.cpp => rosbag2_play_integration_test.cpp} (92%) create mode 100644 rosbag2/test/rosbag2/rosbag2_play_timing_integration_test.cpp create mode 100644 rosbag2/test/rosbag2/rosbag2_record_all_integration_test.cpp rename rosbag2/test/rosbag2/{rosbag2_write_integration_fixture.hpp => rosbag2_record_integration_fixture.hpp} (91%) create mode 100644 rosbag2/test/rosbag2/rosbag2_record_integration_test.cpp delete mode 100644 rosbag2/test/rosbag2/rosbag2_write_all_integration_test.cpp delete mode 100644 rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp create mode 100644 shared_queues_vendor/CMakeLists.txt create mode 100644 shared_queues_vendor/package.xml diff --git a/rosbag2/CMakeLists.txt b/rosbag2/CMakeLists.txt index c9d86701de..7600030c35 100644 --- a/rosbag2/CMakeLists.txt +++ b/rosbag2/CMakeLists.txt @@ -24,19 +24,19 @@ find_package(rclcpp REQUIRED) find_package(rcutils REQUIRED) find_package(rosbag2_storage REQUIRED) find_package(rosidl_generator_cpp REQUIRED) -find_package(std_msgs REQUIRED) +find_package(shared_queues_vendor REQUIRED) add_library( librosbag2 SHARED src/rosbag2/rosbag2.cpp + src/rosbag2/player.cpp src/rosbag2/typesupport_helpers.cpp src/rosbag2/generic_publisher.cpp src/rosbag2/generic_subscription.cpp src/rosbag2/rosbag2_node.cpp) -ament_target_dependencies( - librosbag2 +ament_target_dependencies(librosbag2 ament_index_cpp Poco rcl @@ -44,8 +44,9 @@ ament_target_dependencies( rcutils rosbag2_storage rosidl_generator_c - std_msgs + shared_queues_vendor ) + target_include_directories(librosbag2 PUBLIC $ @@ -78,37 +79,48 @@ if(BUILD_TESTING) find_package(test_msgs REQUIRED) ament_lint_auto_find_test_dependencies() - ament_add_gmock(rosbag2_write_integration_test - test/rosbag2/rosbag2_write_integration_test.cpp + ament_add_gmock(rosbag2_record_integration_test + test/rosbag2/rosbag2_record_integration_test.cpp test/rosbag2/test_memory_management.cpp WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) - if(TARGET rosbag2_write_integration_test) - target_link_libraries(rosbag2_write_integration_test librosbag2 + if(TARGET rosbag2_record_integration_test) + target_link_libraries(rosbag2_record_integration_test librosbag2 + ${test_msgs_LIBRARIES} ${rosbag2_storage_default_plugins_LIBRARIES}) - target_include_directories(rosbag2_write_integration_test PRIVATE + target_include_directories(rosbag2_record_integration_test PRIVATE + ${test_msgs_INCLUDE_DIRS} ${rosbag2_storage_default_plugins_INCLUDE_DIRS}) endif() - ament_add_gmock(rosbag2_write_all_integration_test - test/rosbag2/rosbag2_write_all_integration_test.cpp + ament_add_gmock(rosbag2_record_all_integration_test + test/rosbag2/rosbag2_record_all_integration_test.cpp test/rosbag2/test_memory_management.cpp WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) - if(TARGET rosbag2_write_all_integration_test) - target_link_libraries(rosbag2_write_all_integration_test librosbag2 + if(TARGET rosbag2_record_all_integration_test) + target_link_libraries(rosbag2_record_all_integration_test librosbag2 + ${test_msgs_LIBRARIES} ${rosbag2_storage_default_plugins_LIBRARIES}) - target_include_directories(rosbag2_write_all_integration_test PRIVATE + target_include_directories(rosbag2_record_all_integration_test PRIVATE + ${test_msgs_INCLUDE_DIRS} ${rosbag2_storage_default_plugins_INCLUDE_DIRS}) endif() - ament_add_gmock(rosbag2_read_integration_test - test/rosbag2/rosbag2_read_integration_test.cpp + ament_add_gmock(rosbag2_play_integration_test + test/rosbag2/rosbag2_play_integration_test.cpp test/rosbag2/test_memory_management.cpp WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) - if(TARGET rosbag2_read_integration_test) - target_link_libraries(rosbag2_read_integration_test librosbag2) - ament_target_dependencies(rosbag2_read_integration_test - test_msgs - ) + if(TARGET rosbag2_play_integration_test) + target_link_libraries(rosbag2_play_integration_test librosbag2) + ament_target_dependencies(rosbag2_play_integration_test test_msgs) + endif() + + ament_add_gmock(rosbag2_play_timing_integration_test + test/rosbag2/rosbag2_play_timing_integration_test.cpp + test/rosbag2/test_memory_management.cpp + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + if(TARGET rosbag2_play_timing_integration_test) + target_link_libraries(rosbag2_play_timing_integration_test librosbag2) + ament_target_dependencies(rosbag2_play_timing_integration_test test_msgs) endif() ament_add_gmock(rosbag2_typesupport_helpers_test @@ -142,7 +154,7 @@ if(BUILD_TESTING) ament_index_cpp Poco rclcpp - std_msgs + test_msgs ) endif() endif() diff --git a/rosbag2/include/rosbag2/rosbag2.hpp b/rosbag2/include/rosbag2/rosbag2.hpp index 135aac09b9..317bceb25e 100644 --- a/rosbag2/include/rosbag2/rosbag2.hpp +++ b/rosbag2/include/rosbag2/rosbag2.hpp @@ -23,8 +23,10 @@ #include "rclcpp/rclcpp.hpp" +#include "rosbag2_storage/storage_factory.hpp" #include "rosbag2_storage/storage_interfaces/read_only_interface.hpp" #include "rosbag2_storage/storage_interfaces/read_write_interface.hpp" +#include "rosbag2/rosbag2_play_options.hpp" #include "rosbag2/visibility_control.hpp" namespace rosbag2 @@ -33,10 +35,14 @@ namespace rosbag2 class GenericPublisher; class GenericSubscription; class Rosbag2Node; +class Player; class Rosbag2 { public: + ROSBAG2_PUBLIC + Rosbag2(); + /** * Records topics to a bagfile. Subscription happens at startup time, hence the topics must * exist when "record" is called. @@ -44,32 +50,35 @@ class Rosbag2 * \param file_name Name of the bagfile to write * \param topic_names Vector of topics to subscribe to. Topics must exist at startup time. If * the vector is empty, all topics will be subscribed. - * \param after_write_action This function will be executed after each write to the database - * where the input parameter is the topic name of the topic written Currently needed for testing. - * Might be removed later. */ ROSBAG2_PUBLIC void record(const std::string & file_name, const std::vector & topic_names); + /** + * Records all available topics to a bagfile. Subscription happens at startup time, hence only + * topics available at startup time are recorded. + * + * \param file_name Name of the bagfile to write + */ + ROSBAG2_PUBLIC + void record(const std::string & file_name); + /** * Replay all topics in a bagfile. * * \param file_name Name of the bagfile to replay */ ROSBAG2_PUBLIC - void play(const std::string & file_name); + void play(const std::string & file_name, const Rosbag2PlayOptions & options); private: - void prepare_publishers( - std::shared_ptr node, - std::shared_ptr storage); - std::shared_ptr create_subscription( std::shared_ptr storage, - std::shared_ptr & node, const std::string & topic_name, const std::string & topic_type) const; + rosbag2_storage::StorageFactory factory_; + std::shared_ptr node_; std::vector> subscriptions_; std::map> publishers_; }; diff --git a/rosbag2/include/rosbag2/rosbag2_play_options.hpp b/rosbag2/include/rosbag2/rosbag2_play_options.hpp new file mode 100644 index 0000000000..346a77e809 --- /dev/null +++ b/rosbag2/include/rosbag2/rosbag2_play_options.hpp @@ -0,0 +1,30 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2__ROSBAG2_PLAY_OPTIONS_HPP_ +#define ROSBAG2__ROSBAG2_PLAY_OPTIONS_HPP_ + +#include + +namespace rosbag2 +{ +struct Rosbag2PlayOptions +{ +public: + size_t read_ahead_queue_size; +}; + +} // namespace rosbag2 + +#endif // ROSBAG2__ROSBAG2_PLAY_OPTIONS_HPP_ diff --git a/rosbag2/package.xml b/rosbag2/package.xml index c83864c057..0a87d5be67 100644 --- a/rosbag2/package.xml +++ b/rosbag2/package.xml @@ -10,24 +10,19 @@ ament_cmake ament_index_cpp - libpoco-dev poco_vendor rcl rclcpp rcutils rosbag2_storage rosidl_generator_cpp - std_msgs + shared_queues_vendor ament_cmake_gmock ament_lint_auto ament_lint_common ament_index_cpp - poco_vendor - rcl - rclcpp rosbag2_storage_default_plugins - std_msgs test_msgs diff --git a/rosbag2/src/rosbag2/demo_play.cpp b/rosbag2/src/rosbag2/demo_play.cpp index 52a3a13d1b..57c8797894 100644 --- a/rosbag2/src/rosbag2/demo_play.cpp +++ b/rosbag2/src/rosbag2/demo_play.cpp @@ -15,15 +15,18 @@ #include #include "rclcpp/rclcpp.hpp" - +#include "rosbag2/rosbag2_play_options.hpp" #include "rosbag2/rosbag2.hpp" int main(int argc, const char ** argv) { rclcpp::init(argc, argv); + auto options = rosbag2::Rosbag2PlayOptions(); + options.read_ahead_queue_size = 1000; + rosbag2::Rosbag2 rosbag2; - rosbag2.play("test.bag"); + rosbag2.play("test.bag", options); rclcpp::shutdown(); diff --git a/rosbag2/src/rosbag2/demo_record.cpp b/rosbag2/src/rosbag2/demo_record.cpp index ae37f5ebf9..f95529fcfb 100644 --- a/rosbag2/src/rosbag2/demo_record.cpp +++ b/rosbag2/src/rosbag2/demo_record.cpp @@ -42,7 +42,11 @@ int main(int argc, const char ** argv) rclcpp::init(argc, argv); rosbag2::Rosbag2 rosbag2; - rosbag2.record(filename, topics); + if (topics.empty()) { + rosbag2.record(filename); + } else { + rosbag2.record(filename, topics); + } rclcpp::shutdown(); diff --git a/rosbag2/src/rosbag2/player.cpp b/rosbag2/src/rosbag2/player.cpp new file mode 100644 index 0000000000..4c73ea1786 --- /dev/null +++ b/rosbag2/src/rosbag2/player.cpp @@ -0,0 +1,138 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "player.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "rcl/graph.h" +#include "rcutils/time.h" + +#include "rosbag2_storage/serialized_bag_message.hpp" +#include "rosbag2_storage/storage_interfaces/read_only_interface.hpp" +#include "rosbag2_storage/storage_factory.hpp" +#include "rosbag2/logging.hpp" +#include "rosbag2_node.hpp" +#include "replayable_message.hpp" +#include "typesupport_helpers.hpp" + +namespace rosbag2 +{ + +Player::Player(std::shared_ptr storage) +: storage_(storage), node_(std::make_shared("rosbag2_node")) +{} + +bool Player::is_storage_completely_loaded() const +{ + if (storage_loading_future_.valid() && + storage_loading_future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready) + { + storage_loading_future_.get(); + } + return !storage_loading_future_.valid(); +} + +void Player::play(const Rosbag2PlayOptions & options) +{ + prepare_publishers(); + + storage_loading_future_ = std::async(std::launch::async, + [this, options]() {load_storage_content(options);}); + + wait_for_filled_queue(options); + + play_messages_from_queue(); +} + +void Player::wait_for_filled_queue(const Rosbag2PlayOptions & options) const +{ + while ( + message_queue_.size_approx() < options.read_ahead_queue_size && + !is_storage_completely_loaded()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } +} + +void Player::load_storage_content(const Rosbag2PlayOptions & options) +{ + TimePoint time_first_message; + + ReplayableMessage message; + if (storage_->has_next()) { + message.message = storage_->read_next(); + message.time_since_start = std::chrono::nanoseconds(0); + time_first_message = TimePoint(std::chrono::nanoseconds(message.message->time_stamp)); + message_queue_.enqueue(message); + } + + auto queue_lower_boundary = + static_cast(options.read_ahead_queue_size * read_ahead_lower_bound_percentage_); + auto queue_upper_boundary = options.read_ahead_queue_size; + + while (storage_->has_next()) { + if (message_queue_.size_approx() < queue_lower_boundary) { + enqueue_up_to_boundary(time_first_message, queue_upper_boundary); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } +} + +void Player::enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary) +{ + ReplayableMessage message; + for (size_t i = message_queue_.size_approx(); i < boundary; i++) { + if (!storage_->has_next()) { + break; + } + message.message = storage_->read_next(); + message.time_since_start = + TimePoint(std::chrono::nanoseconds(message.message->time_stamp)) - time_first_message; + + message_queue_.enqueue(message); + } +} + +void Player::play_messages_from_queue() +{ + auto start_time = std::chrono::high_resolution_clock::now(); + + while (message_queue_.size_approx() != 0 || !is_storage_completely_loaded()) { + ReplayableMessage message; + if (message_queue_.try_dequeue(message)) { + std::this_thread::sleep_until(start_time + message.time_since_start); + publishers_[message.message->topic_name]->publish(message.message->serialized_data); + } + } +} + +void Player::prepare_publishers() +{ + auto all_topics_and_types = storage_->get_all_topics_and_types(); + for (const auto & element : all_topics_and_types) { + publishers_.insert(std::make_pair( + element.first, node_->create_generic_publisher(element.first, element.second))); + } +} + +} // namespace rosbag2 diff --git a/rosbag2/src/rosbag2/player.hpp b/rosbag2/src/rosbag2/player.hpp new file mode 100644 index 0000000000..c30f360748 --- /dev/null +++ b/rosbag2/src/rosbag2/player.hpp @@ -0,0 +1,64 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2__PLAYER_HPP_ +#define ROSBAG2__PLAYER_HPP_ + +#include +#include +#include +#include +#include + +#include "moodycamel/readerwriterqueue.h" +#include "replayable_message.hpp" +#include "rosbag2_storage/storage_interfaces/read_only_interface.hpp" +#include "rosbag2_storage/storage_interfaces/read_write_interface.hpp" +#include "rosbag2/rosbag2_play_options.hpp" + +using TimePoint = std::chrono::time_point; + +namespace rosbag2 +{ + +class GenericPublisher; +class Rosbag2Node; + +class Player +{ +public: + explicit Player(std::shared_ptr storage); + + void play(const Rosbag2PlayOptions & options); + +private: + void load_storage_content(const Rosbag2PlayOptions & options); + bool is_storage_completely_loaded() const; + void enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary); + void wait_for_filled_queue(const Rosbag2PlayOptions & options) const; + void play_messages_from_queue(); + void prepare_publishers(); + + static constexpr double read_ahead_lower_bound_percentage_ = 0.9; + + std::shared_ptr storage_; + moodycamel::ReaderWriterQueue message_queue_; + mutable std::future storage_loading_future_; + std::shared_ptr node_; + std::map> publishers_; +}; + +} // namespace rosbag2 + +#endif // ROSBAG2__PLAYER_HPP_ diff --git a/rosbag2/src/rosbag2/replayable_message.hpp b/rosbag2/src/rosbag2/replayable_message.hpp new file mode 100644 index 0000000000..abb18c9ea9 --- /dev/null +++ b/rosbag2/src/rosbag2/replayable_message.hpp @@ -0,0 +1,34 @@ +// Copyright 2018 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2__REPLAYABLE_MESSAGE_HPP_ +#define ROSBAG2__REPLAYABLE_MESSAGE_HPP_ + +#include +#include + +#include "rosbag2_storage/serialized_bag_message.hpp" + +namespace rosbag2 +{ + +struct ReplayableMessage +{ + std::shared_ptr message; + std::chrono::nanoseconds time_since_start; +}; + +} // namespace rosbag2 + +#endif // ROSBAG2__REPLAYABLE_MESSAGE_HPP_ diff --git a/rosbag2/src/rosbag2/rosbag2.cpp b/rosbag2/src/rosbag2/rosbag2.cpp index c201c0e2b5..8f80a2d429 100644 --- a/rosbag2/src/rosbag2/rosbag2.cpp +++ b/rosbag2/src/rosbag2/rosbag2.cpp @@ -17,12 +17,13 @@ #include #include #include +#include #include #include #include "rcl/graph.h" #include "rclcpp/rclcpp.hpp" -#include "rcutils/logging_macros.h" +#include "rcutils/time.h" #include "rosbag2/logging.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" @@ -30,32 +31,27 @@ #include "rosbag2_storage/storage_interfaces/read_only_interface.hpp" #include "rosbag2_storage/storage_interfaces/read_write_interface.hpp" -#include "std_msgs/msg/string.hpp" - #include "generic_subscription.hpp" #include "rosbag2_node.hpp" +#include "player.hpp" +#include "replayable_message.hpp" #include "typesupport_helpers.hpp" namespace rosbag2 { - -const char * ROS_PACKAGE_NAME = "rosbag2"; +Rosbag2::Rosbag2() +: node_(std::make_shared("rosbag2")) +{} void Rosbag2::record(const std::string & file_name, const std::vector & topic_names) { - rosbag2_storage::StorageFactory factory; - auto storage = factory.open_read_write(file_name, "sqlite3"); + auto storage = factory_.open_read_write(file_name, "sqlite3"); if (!storage) { throw std::runtime_error("No storage could be initialized. Abort"); } - auto node = std::make_shared("rosbag2"); - - auto topics_and_types = topic_names.empty() ? - node->get_all_topics_with_types() : - node->get_topics_with_types(topic_names); - + auto topics_and_types = node_->get_topics_with_types(topic_names); if (topics_and_types.empty()) { throw std::runtime_error("No topics found. Abort"); } @@ -65,7 +61,7 @@ void Rosbag2::record(const std::string & file_name, const std::vector subscription = create_subscription( - storage, node, topic_name, topic_type); + storage, topic_name, topic_type); if (subscription) { subscriptions_.push_back(subscription); @@ -80,18 +76,35 @@ void Rosbag2::record(const std::string & file_name, const std::vectorget_all_topics_with_types(); + std::vector topic_names; + topic_names.reserve(topics_and_types.size()); + for (const auto & topic_and_type : topics_and_types) { + topic_names.push_back(topic_and_type.first); + } + + if (topic_names.empty()) { + ROSBAG2_LOG_ERROR("no topics found to record"); + return; + } + + record(file_name, topic_names); +} + std::shared_ptr Rosbag2::create_subscription( std::shared_ptr storage, - std::shared_ptr & node, const std::string & topic_name, const std::string & topic_type) const { - auto subscription = node->create_generic_subscription( + auto subscription = node_->create_generic_subscription( topic_name, topic_type, [storage, topic_name](std::shared_ptr message) { @@ -111,34 +124,15 @@ Rosbag2::create_subscription( return subscription; } -void Rosbag2::play(const std::string & file_name) +void Rosbag2::play(const std::string & file_name, const Rosbag2PlayOptions & options) { - rosbag2_storage::StorageFactory factory; - auto storage = factory.open_read_only(file_name, "sqlite3"); - - if (storage) { - auto node = std::make_shared("rosbag2_node"); - prepare_publishers(node, storage); - - while (storage->has_next()) { - auto message = storage->read_next(); - // without the sleep_for() many messages are lost. - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - publishers_[message->topic_name]->publish(message->serialized_data); - ROSBAG2_LOG_INFO("Published message"); - } + auto storage = factory_.open_read_only(file_name, "sqlite3"); + if (!storage) { + throw std::runtime_error("Could not open storage: " + file_name); } -} -void Rosbag2::prepare_publishers( - std::shared_ptr node, - std::shared_ptr storage) -{ - auto all_topics_and_types = storage->get_all_topics_and_types(); - for (const auto & element : all_topics_and_types) { - publishers_.insert(std::make_pair( - element.first, node->create_generic_publisher(element.first, element.second))); - } + Player player(storage); + player.play(options); } } // namespace rosbag2 diff --git a/rosbag2/src/rosbag2/typesupport_helpers.hpp b/rosbag2/src/rosbag2/typesupport_helpers.hpp index 4661b58edf..6edc8cb7c5 100644 --- a/rosbag2/src/rosbag2/typesupport_helpers.hpp +++ b/rosbag2/src/rosbag2/typesupport_helpers.hpp @@ -18,7 +18,7 @@ #include #include -#include "rosidl_generator_c/message_type_support_struct.h" +#include "rosidl_generator_cpp/message_type_support_decl.hpp" namespace rosbag2 { diff --git a/rosbag2/test/rosbag2/rosbag2_read_integration_test.cpp b/rosbag2/test/rosbag2/rosbag2_play_integration_test.cpp similarity index 92% rename from rosbag2/test/rosbag2/rosbag2_read_integration_test.cpp rename to rosbag2/test/rosbag2/rosbag2_play_integration_test.cpp index a9378cb325..075efafce1 100644 --- a/rosbag2/test/rosbag2/rosbag2_read_integration_test.cpp +++ b/rosbag2/test/rosbag2/rosbag2_play_integration_test.cpp @@ -54,11 +54,18 @@ class RosBag2IntegrationTestFixture : public Rosbag2TestFixture auto node = rclcpp::Node::make_shared("node_" + topic); auto messages = std::make_shared>(); auto messages_received = std::make_shared(0); + rmw_qos_profile_t qos_profile; + qos_profile.history = RMW_QOS_POLICY_HISTORY_KEEP_ALL; + qos_profile.depth = 3; + qos_profile.reliability = RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT; + qos_profile.durability = RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT; + qos_profile.avoid_ros_namespace_conventions = false; + auto subscription = node->create_subscription(topic, [messages, messages_received](typename T::ConstSharedPtr message) { messages->push_back(message); ++*messages_received; - }); + }, qos_profile); subscriptions_.push_back(subscription); return [messages, messages_received, node, expected_messages_number]() { @@ -69,17 +76,6 @@ class RosBag2IntegrationTestFixture : public Rosbag2TestFixture }; } - template - std::shared_ptr serialize_test_message( - const std::string & topic, std::shared_ptr message) - { - auto bag_msg = std::make_shared(); - bag_msg->serialized_data = memory_management_.serialize_message(message); - bag_msg->topic_name = topic; - - return bag_msg; - } - template auto launch_subscriber(size_t expected_messages_number, const std::string & topic) { @@ -139,7 +135,7 @@ TEST_F(RosBag2IntegrationTestFixture, recorded_messages_are_played_for_all_topic wait_for_subscribers(2); Rosbag2 rosbag2; - rosbag2.play(database_name_); + rosbag2.play(database_name_, options_); auto replayed_test_primitives = primitive_subscriber_future.get(); ASSERT_THAT(replayed_test_primitives, SizeIs(Ge(2u))); diff --git a/rosbag2/test/rosbag2/rosbag2_play_timing_integration_test.cpp b/rosbag2/test/rosbag2/rosbag2_play_timing_integration_test.cpp new file mode 100644 index 0000000000..94ecdd817b --- /dev/null +++ b/rosbag2/test/rosbag2/rosbag2_play_timing_integration_test.cpp @@ -0,0 +1,63 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "rosbag2/rosbag2.hpp" +#include "rosbag2_test_fixture.hpp" +#include "test_msgs/msg/primitives.hpp" +#include "test_msgs/message_fixtures.hpp" + + +using namespace ::testing; // NOLINT +using namespace rosbag2; // NOLINT + +TEST_F(Rosbag2TestFixture, playing_respects_relative_timing_of_stored_messages) +{ + rclcpp::init(0, nullptr); + auto primitive_message1 = get_messages_primitives()[0]; + primitive_message1->string_value = "Hello World 1"; + + auto primitive_message2 = get_messages_primitives()[0]; + primitive_message2->string_value = "Hello World 2"; + + auto message_time_difference = std::chrono::seconds(1); + auto topics_and_types = std::map{{"topic1", "test_msgs/Primitives"}}; + std::vector> messages = + {serialize_test_message("topic1", primitive_message1), + serialize_test_message("topic1", primitive_message2)}; + + messages[0]->time_stamp = 100; + messages[1]->time_stamp = + messages[0]->time_stamp + std::chrono::nanoseconds(message_time_difference).count(); + write_messages(database_name_, messages, topics_and_types); + + // We can only assert indirectly that the relative times are respected when playing a bag. So + // we check that time elapsed during playing is at least the time difference between the two + // messages + auto start = std::chrono::steady_clock::now(); + Rosbag2 rosbag2; + rosbag2.play(database_name_, options_); + auto replay_time = std::chrono::steady_clock::now() - start; + + ASSERT_THAT(replay_time, Gt(message_time_difference)); + rclcpp::shutdown(); +} diff --git a/rosbag2/test/rosbag2/rosbag2_record_all_integration_test.cpp b/rosbag2/test/rosbag2/rosbag2_record_all_integration_test.cpp new file mode 100644 index 0000000000..b23af19fff --- /dev/null +++ b/rosbag2/test/rosbag2/rosbag2_record_all_integration_test.cpp @@ -0,0 +1,65 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "rosbag2/rosbag2.hpp" +#include "rosbag2_record_integration_fixture.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" +#include "test_msgs/msg/primitives.hpp" +#include "test_msgs/msg/static_array_primitives.hpp" +#include "test_msgs/message_fixtures.hpp" +#include "test_memory_management.hpp" + +// TODO(Martin-Idel-SI): merge with other write and read tests once signal handling is sorted out +TEST_F(RosBag2RecordIntegrationTestFixture, published_messages_from_multiple_topics_are_recorded) +{ + auto array_message = get_messages_static_array_primitives()[0]; + array_message->string_values = {{"Complex Hello1", "Complex Hello2", "Complex Hello3"}}; + array_message->bool_values = {{true, false, true}}; + std::string array_topic = "/array_topic"; + auto serialized_array_bag_message = serialize_test_message(array_topic, array_message); + + auto string_message = get_messages_primitives()[0]; + string_message->string_value = "Hello World"; + std::string string_topic = "/string_topic"; + auto serialized_string_bag_message = serialize_test_message(string_topic, string_message); + + start_publishing(serialized_string_bag_message, string_topic, 2); + start_publishing( + serialized_array_bag_message, array_topic, 2); + + start_recording_all_topics(); + wait_for_publishers_to_stop(); + stop_recording(); + + auto recorded_messages = get_messages(database_name_); + + ASSERT_THAT(recorded_messages, SizeIs(4)); + auto string_messages = filter_messages( + recorded_messages, string_topic); + auto array_messages = filter_messages( + recorded_messages, array_topic); + ASSERT_THAT(string_messages, SizeIs(2)); + ASSERT_THAT(array_messages, SizeIs(2)); + EXPECT_THAT(string_messages[0]->string_value, Eq("Hello World")); + EXPECT_THAT(array_messages[0]->bool_values, ElementsAre(true, false, true)); + EXPECT_THAT(array_messages[0]->string_values, + ElementsAre("Complex Hello1", "Complex Hello2", "Complex Hello3")); +} diff --git a/rosbag2/test/rosbag2/rosbag2_write_integration_fixture.hpp b/rosbag2/test/rosbag2/rosbag2_record_integration_fixture.hpp similarity index 91% rename from rosbag2/test/rosbag2/rosbag2_write_integration_fixture.hpp rename to rosbag2/test/rosbag2/rosbag2_record_integration_fixture.hpp index 9bf022eb9a..637c262d99 100644 --- a/rosbag2/test/rosbag2/rosbag2_write_integration_fixture.hpp +++ b/rosbag2/test/rosbag2/rosbag2_record_integration_fixture.hpp @@ -34,13 +34,13 @@ using namespace ::testing; // NOLINT using namespace rosbag2; // NOLINT using namespace std::chrono_literals; // NOLINT -#ifndef ROSBAG2__ROSBAG2_WRITE_INTEGRATION_FIXTURE_HPP_ -#define ROSBAG2__ROSBAG2_WRITE_INTEGRATION_FIXTURE_HPP_ +#ifndef ROSBAG2__ROSBAG2_RECORD_INTEGRATION_FIXTURE_HPP_ +#define ROSBAG2__ROSBAG2_RECORD_INTEGRATION_FIXTURE_HPP_ -class RosBag2WriteIntegrationTestFixture : public Rosbag2TestFixture +class RosBag2RecordIntegrationTestFixture : public Rosbag2TestFixture { public: - RosBag2WriteIntegrationTestFixture() + RosBag2RecordIntegrationTestFixture() : Rosbag2TestFixture() { rosbag2_storage_plugins::SqliteWrapper @@ -66,7 +66,8 @@ class RosBag2WriteIntegrationTestFixture : public Rosbag2TestFixture future_ = std::async( std::launch::async, [this]() { rosbag2::Rosbag2 rosbag2; - rosbag2.record(database_name_, {}); + std::this_thread::sleep_for(2s); + rosbag2.record(database_name_); }); } @@ -83,13 +84,14 @@ class RosBag2WriteIntegrationTestFixture : public Rosbag2TestFixture } } + template void start_publishing( std::shared_ptr message, const std::string & topic_name, size_t number_expected_messages) { publisher_futures_.push_back(std::async( std::launch::async, [this, message, topic_name, number_expected_messages]() { - auto publisher = publisher_node_->create_publisher(topic_name); + auto publisher = publisher_node_->create_publisher(topic_name); rosbag2_storage_plugins::SqliteWrapper db(database_name_, rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY); @@ -159,4 +161,4 @@ class RosBag2WriteIntegrationTestFixture : public Rosbag2TestFixture std::future future_; }; -#endif // ROSBAG2__ROSBAG2_WRITE_INTEGRATION_FIXTURE_HPP_ +#endif // ROSBAG2__ROSBAG2_RECORD_INTEGRATION_FIXTURE_HPP_ diff --git a/rosbag2/test/rosbag2/rosbag2_record_integration_test.cpp b/rosbag2/test/rosbag2/rosbag2_record_integration_test.cpp new file mode 100644 index 0000000000..6fe7770fb7 --- /dev/null +++ b/rosbag2/test/rosbag2/rosbag2_record_integration_test.cpp @@ -0,0 +1,65 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "rosbag2/rosbag2.hpp" +#include "rosbag2_record_integration_fixture.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" +#include "test_memory_management.hpp" +#include "test_msgs/msg/primitives.hpp" +#include "test_msgs/msg/static_array_primitives.hpp" +#include "test_msgs/message_fixtures.hpp" + +// TODO(Martin-Idel-SI): merge with other write and read tests once signal handling is sorted out +TEST_F(RosBag2RecordIntegrationTestFixture, published_messages_from_multiple_topics_are_recorded) +{ + auto array_message = get_messages_static_array_primitives()[0]; + array_message->string_values = {{"Complex Hello1", "Complex Hello2", "Complex Hello3"}}; + array_message->bool_values = {{true, false, true}}; + std::string array_topic = "/array_topic"; + auto serialized_array_bag_message = serialize_test_message(array_topic, array_message); + + auto string_message = get_messages_primitives()[0]; + string_message->string_value = "Hello World"; + std::string string_topic = "/string_topic"; + auto serialized_string_bag_message = serialize_test_message(string_topic, string_message); + + start_publishing(serialized_string_bag_message, string_topic, 2); + start_publishing( + serialized_array_bag_message, array_topic, 2); + + start_recording({string_topic, array_topic}); + wait_for_publishers_to_stop(); + stop_recording(); + + auto recorded_messages = get_messages(database_name_); + + ASSERT_THAT(recorded_messages, SizeIs(4)); + auto string_messages = filter_messages( + recorded_messages, string_topic); + auto array_messages = filter_messages( + recorded_messages, array_topic); + ASSERT_THAT(string_messages, SizeIs(2)); + ASSERT_THAT(array_messages, SizeIs(2)); + EXPECT_THAT(string_messages[0]->string_value, Eq("Hello World")); + EXPECT_THAT(array_messages[0]->bool_values, ElementsAre(true, false, true)); + EXPECT_THAT(array_messages[0]->string_values, + ElementsAre("Complex Hello1", "Complex Hello2", "Complex Hello3")); +} diff --git a/rosbag2/test/rosbag2/rosbag2_rosbag_node_test.cpp b/rosbag2/test/rosbag2/rosbag2_rosbag_node_test.cpp index ab98f49b57..0bef371517 100644 --- a/rosbag2/test/rosbag2/rosbag2_rosbag_node_test.cpp +++ b/rosbag2/test/rosbag2/rosbag2_rosbag_node_test.cpp @@ -21,11 +21,11 @@ #include #include "rclcpp/rclcpp.hpp" -#include "std_msgs/msg/string.hpp" - +#include "test_memory_management.hpp" +#include "test_msgs/message_fixtures.hpp" +#include "test_msgs/msg/primitives.hpp" #include "../../src/rosbag2/rosbag2_node.hpp" #include "../../src/rosbag2/typesupport_helpers.hpp" -#include "test_memory_management.hpp" using namespace ::testing; // NOLINT @@ -50,7 +50,7 @@ class RosBag2NodeFixture : public Test void create_publisher(const std::string & topic) { - auto publisher = publisher_node_->create_publisher(topic); + auto publisher = publisher_node_->create_publisher(topic); publishers_.push_back(publisher); } @@ -61,9 +61,9 @@ class RosBag2NodeFixture : public Test size_t counter = 0; auto subscription = node_->create_generic_subscription(topic_name, type, [this, &counter, &messages](std::shared_ptr message) { - auto string_message = memory_management_ - .deserialize_message(message); - messages.push_back(string_message->data); + auto string_message = + memory_management_.deserialize_message(message); + messages.push_back(string_message->string_value); counter++; }); @@ -75,8 +75,8 @@ class RosBag2NodeFixture : public Test std::shared_ptr serialize_string_message(const std::string & message) { - auto string_message = std::make_shared(); - string_message->data = message; + auto string_message = std::make_shared(); + string_message->string_value = message; return memory_management_.serialize_message(string_message); } @@ -92,7 +92,7 @@ TEST_F(RosBag2NodeFixture, publisher_and_subscriber_work) // We currently publish more messages because they can get lost std::vector test_messages = {"Hello World", "Hello World"}; std::string topic_name = "string_topic"; - std::string type = "std_msgs/String"; + std::string type = "test_msgs/Primitives"; auto publisher = node_->create_generic_publisher(topic_name, type); @@ -127,7 +127,7 @@ TEST_F(RosBag2NodeFixture, auto topics_and_types = node_->get_topics_with_types({"string_topic"}); ASSERT_THAT(topics_and_types, SizeIs(1)); - EXPECT_THAT(topics_and_types.begin()->second, StrEq("std_msgs/String")); + EXPECT_THAT(topics_and_types.begin()->second, StrEq("test_msgs/Primitives")); } TEST_F(RosBag2NodeFixture, @@ -138,7 +138,7 @@ TEST_F(RosBag2NodeFixture, auto topics_and_types = node_->get_topics_with_types({"/string_topic"}); ASSERT_THAT(topics_and_types, SizeIs(1)); - EXPECT_THAT(topics_and_types.begin()->second, StrEq("std_msgs/String")); + EXPECT_THAT(topics_and_types.begin()->second, StrEq("test_msgs/Primitives")); } TEST_F(RosBag2NodeFixture, get_topics_with_types_returns_only_specified_topics) { @@ -153,8 +153,8 @@ TEST_F(RosBag2NodeFixture, get_topics_with_types_returns_only_specified_topics) auto topics_and_types = node_->get_topics_with_types({first_topic, second_topic}); ASSERT_THAT(topics_and_types, SizeIs(2)); - EXPECT_THAT(topics_and_types.find(first_topic)->second, StrEq("std_msgs/String")); - EXPECT_THAT(topics_and_types.find(second_topic)->second, StrEq("std_msgs/String")); + EXPECT_THAT(topics_and_types.find(first_topic)->second, StrEq("test_msgs/Primitives")); + EXPECT_THAT(topics_and_types.find(second_topic)->second, StrEq("test_msgs/Primitives")); } TEST_F(RosBag2NodeFixture, get_all_topics_with_types_returns_all_topics) @@ -170,9 +170,9 @@ TEST_F(RosBag2NodeFixture, get_all_topics_with_types_returns_all_topics) auto topics_and_types = node_->get_all_topics_with_types(); ASSERT_THAT(topics_and_types, SizeIs(5)); - EXPECT_THAT(topics_and_types.find(first_topic)->second, StrEq("std_msgs/String")); - EXPECT_THAT(topics_and_types.find(second_topic)->second, StrEq("std_msgs/String")); - EXPECT_THAT(topics_and_types.find(third_topic)->second, StrEq("std_msgs/String")); + EXPECT_THAT(topics_and_types.find(first_topic)->second, StrEq("test_msgs/Primitives")); + EXPECT_THAT(topics_and_types.find(second_topic)->second, StrEq("test_msgs/Primitives")); + EXPECT_THAT(topics_and_types.find(third_topic)->second, StrEq("test_msgs/Primitives")); // The latter two topics can usually be found on any node, so they should be subscribed here EXPECT_THAT(topics_and_types.find("/clock")->second, StrEq("rosgraph_msgs/Clock")); EXPECT_THAT( diff --git a/rosbag2/test/rosbag2/rosbag2_test_fixture.hpp b/rosbag2/test/rosbag2/rosbag2_test_fixture.hpp index a9c9d51964..5593ca015a 100644 --- a/rosbag2/test/rosbag2/rosbag2_test_fixture.hpp +++ b/rosbag2/test/rosbag2/rosbag2_test_fixture.hpp @@ -30,10 +30,10 @@ # include #endif +#include "rosbag2/rosbag2_play_options.hpp" #include "rosbag2_storage/storage_factory.hpp" #include "rosbag2_storage/storage_interfaces/read_only_interface.hpp" #include "rosbag2_storage/storage_interfaces/read_write_interface.hpp" - #include "test_memory_management.hpp" using namespace ::testing; // NOLINT @@ -50,6 +50,9 @@ class Rosbag2TestFixture : public Test #endif database_name_ = temporary_dir_path_ + system_separator + database_name_; std::cout << "Database name: " << database_name_ << std::endl; + + options_ = rosbag2::Rosbag2PlayOptions(); + options_.read_ahead_queue_size = 1000; } ~Rosbag2TestFixture() override @@ -113,23 +116,23 @@ class Rosbag2TestFixture : public Test void write_messages( const std::string & db_name, - std::vector> messages, - std::map topic_types) + const std::vector> & messages, + const std::map & topics_and_types) { rosbag2_storage::StorageFactory factory; auto storage = factory.open_read_write(db_name, "sqlite3"); - for (auto topic_and_type : topic_types) { + for (const auto & topic_and_type : topics_and_types) { storage->create_topic(topic_and_type.first, topic_and_type.second); } - for (auto msg : messages) { + for (const auto & msg : messages) { storage->write(msg); } } template std::shared_ptr serialize_message( - std::string topic, typename MessageT::_data_type message) + const std::string & topic, typename MessageT::_data_type message) { auto msg = std::make_shared(); msg->data = message; @@ -140,9 +143,21 @@ class Rosbag2TestFixture : public Test return bag_msg; } + template + std::shared_ptr serialize_test_message( + const std::string & topic, std::shared_ptr message) + { + auto bag_msg = std::make_shared(); + bag_msg->serialized_data = memory_management_.serialize_message(message); + bag_msg->topic_name = topic; + + return bag_msg; + } + std::string database_name_; static std::string temporary_dir_path_; test_helpers::TestMemoryManagement memory_management_; + rosbag2::Rosbag2PlayOptions options_; }; std::string Rosbag2TestFixture::temporary_dir_path_ = ""; // NOLINT diff --git a/rosbag2/test/rosbag2/rosbag2_typesupport_helpers_test.cpp b/rosbag2/test/rosbag2/rosbag2_typesupport_helpers_test.cpp index 1e1668031b..6474ef50ae 100644 --- a/rosbag2/test/rosbag2/rosbag2_typesupport_helpers_test.cpp +++ b/rosbag2/test/rosbag2/rosbag2_typesupport_helpers_test.cpp @@ -53,7 +53,7 @@ TEST(TypesupportHelpersTest, throws_exception_if_library_cannot_be_found) { } TEST(TypesupportHelpersTest, returns_c_type_info_for_valid_library) { - auto string_typesupport = rosbag2::get_typesupport("std_msgs/String"); + auto string_typesupport = rosbag2::get_typesupport("test_msgs/Primitives"); EXPECT_THAT(std::string(string_typesupport->typesupport_identifier), ContainsRegex("rosidl_typesupport")); diff --git a/rosbag2/test/rosbag2/rosbag2_write_all_integration_test.cpp b/rosbag2/test/rosbag2/rosbag2_write_all_integration_test.cpp deleted file mode 100644 index 6d2f91a42d..0000000000 --- a/rosbag2/test/rosbag2/rosbag2_write_all_integration_test.cpp +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2018, Bosch Software Innovations GmbH. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include -#include -#include - -#include "rclcpp/rclcpp.hpp" -#include "rosbag2/rosbag2.hpp" -#include "rosbag2_storage/serialized_bag_message.hpp" -#include "std_msgs/msg/string.hpp" -#include "std_msgs/msg/u_int8.hpp" - -#include "rosbag2_write_integration_fixture.hpp" -#include "test_memory_management.hpp" - -// TODO(Martin-Idel-SI): merge with other write and read tests once signal handling is sorted out -TEST_F(RosBag2WriteIntegrationTestFixture, published_messages_from_multiple_topics_are_recorded) -{ - std::string int_topic = "/int_topic"; - auto serialized_int_bag_message = serialize_message(int_topic, 10); - - std::string string_topic = "/string_topic"; - auto serialized_string_bag_message = serialize_message( - string_topic, "test_message"); - - start_publishing(serialized_string_bag_message, string_topic, 2); - start_publishing(serialized_int_bag_message, int_topic, 2); - - start_recording_all_topics(); - wait_for_publishers_to_stop(); - stop_recording(); - - auto recorded_messages = get_messages(database_name_); - - ASSERT_THAT(recorded_messages, SizeIs(4)); - auto string_messages = filter_messages(recorded_messages, string_topic); - auto int_messages = filter_messages(recorded_messages, int_topic); - ASSERT_THAT(string_messages, SizeIs(2)); - ASSERT_THAT(int_messages, SizeIs(2)); - EXPECT_THAT(string_messages[0]->data, Eq("test_message")); - EXPECT_THAT(int_messages[0]->data, Eq(10)); -} diff --git a/rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp b/rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp deleted file mode 100644 index e25573ce8a..0000000000 --- a/rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2018, Bosch Software Innovations GmbH. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include -#include -#include - -#include "rclcpp/rclcpp.hpp" -#include "rosbag2/rosbag2.hpp" -#include "rosbag2_storage/serialized_bag_message.hpp" -#include "std_msgs/msg/string.hpp" -#include "std_msgs/msg/u_int8.hpp" - -#include "rosbag2_write_integration_fixture.hpp" -#include "test_memory_management.hpp" - -// TODO(Martin-Idel-SI): merge with other write and read tests once signal handling is sorted out -TEST_F(RosBag2WriteIntegrationTestFixture, published_messages_from_multiple_topics_are_recorded) -{ - std::string int_topic = "/int_topic"; - auto serialized_int_bag_message = serialize_message(int_topic, 10); - - std::string string_topic = "/string_topic"; - auto serialized_string_bag_message = serialize_message( - string_topic, "test_message"); - - start_publishing(serialized_string_bag_message, string_topic, 2); - start_publishing(serialized_int_bag_message, int_topic, 2); - - start_recording({string_topic, int_topic}); - wait_for_publishers_to_stop(); - stop_recording(); - - auto recorded_messages = get_messages(database_name_); - - ASSERT_THAT(recorded_messages, SizeIs(4)); - auto string_messages = filter_messages(recorded_messages, string_topic); - auto int_messages = filter_messages(recorded_messages, int_topic); - ASSERT_THAT(string_messages, SizeIs(2)); - ASSERT_THAT(int_messages, SizeIs(2)); - EXPECT_THAT(string_messages[0]->data, Eq("test_message")); - EXPECT_THAT(int_messages[0]->data, Eq(10)); -} diff --git a/rosbag2_storage/src/impl/storage_factory_impl.hpp b/rosbag2_storage/src/impl/storage_factory_impl.hpp index e6f2d89187..afaf1d4eeb 100644 --- a/rosbag2_storage/src/impl/storage_factory_impl.hpp +++ b/rosbag2_storage/src/impl/storage_factory_impl.hpp @@ -108,8 +108,10 @@ class StorageFactoryImpl { auto instance = get_interface_instance(read_write_class_loader_, storage_id, uri); - ROSBAG2_STORAGE_LOG_ERROR_STREAM( - "Could not load/open plugin with storage id '" << storage_id << "'."); + if (instance == nullptr) { + ROSBAG2_STORAGE_LOG_ERROR_STREAM( + "Could not load/open plugin with storage id '" << storage_id << "'."); + } return instance; } @@ -125,8 +127,10 @@ class StorageFactoryImpl read_write_class_loader_, storage_id, uri); } - ROSBAG2_STORAGE_LOG_ERROR_STREAM( - "Could not load/open plugin with storage id '" << storage_id << "'."); + if (instance == nullptr) { + ROSBAG2_STORAGE_LOG_ERROR_STREAM( + "Could not load/open plugin with storage id '" << storage_id << "'."); + } return instance; } diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp index a053519891..a2a6f6cc9d 100644 --- a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp @@ -75,8 +75,6 @@ void SqliteStorage::write(std::shared_ptrbind(message->time_stamp, topic_entry->second, message->serialized_data); write_statement_->execute_and_reset(); - - ROSBAG2_STORAGE_DEFAULT_PLUGINS_LOG_INFO("Stored message"); } bool SqliteStorage::has_next() @@ -154,7 +152,7 @@ void SqliteStorage::prepare_for_reading() read_statement_ = database_->prepare_statement( "SELECT data, timestamp, topics.name " "FROM messages JOIN topics ON messages.topic_id = topics.id " - "ORDER BY messages.id;"); + "ORDER BY messages.timestamp;"); message_result_ = read_statement_->execute_query< std::shared_ptr, rcutils_time_point_value_t, std::string>(); current_message_row_ = message_result_.begin(); diff --git a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp index 413fbd025f..cca70075ed 100644 --- a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp +++ b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp @@ -61,6 +61,24 @@ TEST_F(StorageTestFixture, has_next_return_false_if_there_are_no_more_messages) EXPECT_FALSE(readable_storage->has_next()); } +TEST_F(StorageTestFixture, get_next_returns_messages_in_timestamp_order) { + std::vector> string_messages = + {std::make_tuple("first_message", 6, "", ""), std::make_tuple("second_message", 2, "", "")}; + + write_messages_to_sqlite(string_messages); + std::unique_ptr readable_storage = + std::make_unique(); + readable_storage->open(database_name_); + + EXPECT_TRUE(readable_storage->has_next()); + auto first_message = readable_storage->read_next(); + EXPECT_THAT(first_message->time_stamp, Eq(2)); + EXPECT_TRUE(readable_storage->has_next()); + auto second_message = readable_storage->read_next(); + EXPECT_THAT(second_message->time_stamp, Eq(6)); + EXPECT_FALSE(readable_storage->has_next()); +} + TEST_F(StorageTestFixture, get_all_topics_and_types_returns_the_correct_map) { std::unique_ptr writable_storage = std::make_unique(); diff --git a/shared_queues_vendor/CMakeLists.txt b/shared_queues_vendor/CMakeLists.txt new file mode 100644 index 0000000000..5cfaa7b1cd --- /dev/null +++ b/shared_queues_vendor/CMakeLists.txt @@ -0,0 +1,43 @@ +cmake_minimum_required(VERSION 3.5) +project(shared_queues_vendor) + +find_package(ament_cmake REQUIRED) + +include(ExternalProject) +# Single producer single consumer queue by moodycamel - header only, don't build, install +ExternalProject_Add(singleproducerconsumer + DOWNLOAD_DIR ${CMAKE_CURRENT_BINARY_DIR}/single + URL https://github.com/cameron314/readerwriterqueue/archive/ef7dfbf553288064347d51b8ac335f1ca489032a.zip + URL_MD5 64c673dd381b8fae9254053ad7b2be4d + TIMEOUT 60 + INSTALL_DIR ${CMAKE_CURRENT_BINARY_DIR} + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + ) + +# Concurrent and blocking concurrent queue by moodycamel - header only, don't build, install +ExternalProject_Add(concurrentqueue + DOWNLOAD_DIR ${CMAKE_CURRENT_BINARY_DIR}/multiple + URL https://github.com/cameron314/concurrentqueue/archive/8f65a8734d77c3cc00d74c0532efca872931d3ce.zip + URL_MD5 71a0d932cc89150c2ade85f0d9cac9dc + TIMEOUT 60 + INSTALL_DIR ${CMAKE_CURRENT_BINARY_DIR} + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + ) + +# Install headers +install( + FILES + "${CMAKE_CURRENT_BINARY_DIR}/singleproducerconsumer-prefix/src/singleproducerconsumer/atomicops.h" + "${CMAKE_CURRENT_BINARY_DIR}/singleproducerconsumer-prefix/src/singleproducerconsumer/readerwriterqueue.h" + "${CMAKE_CURRENT_BINARY_DIR}/concurrentqueue-prefix/src/concurrentqueue/concurrentqueue.h" + "${CMAKE_CURRENT_BINARY_DIR}/concurrentqueue-prefix/src/concurrentqueue/blockingconcurrentqueue.h" + DESTINATION ${CMAKE_INSTALL_PREFIX}/include/moodycamel +) + +ament_export_include_directories(include) + +ament_package() \ No newline at end of file diff --git a/shared_queues_vendor/package.xml b/shared_queues_vendor/package.xml new file mode 100644 index 0000000000..7969a8aaf4 --- /dev/null +++ b/shared_queues_vendor/package.xml @@ -0,0 +1,15 @@ + + + + shared_queues_vendor + 0.0.0 + Vendor package for concurrent queues from moodycamel + Karsten Knese + Apache License 2.0 + + ament_cmake + + + ament_cmake + +