Skip to content

Commit 86b88b4

Browse files
MichaelOrlovmergify[bot]
authored andcommitted
Fix for multibag replay stagnation (#2158)
* Add "high_freq_topics_does_not_starve_in_multibag_playback" test Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Quick fix for slow playback when one reader has lower freq topic - Add cache for the last element from each reader and taking messages from cache in a chronological order. - Also use "rcpputils::unique_lock" consistently for the reader_mutex_ Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Add ReadersWrapper class that encapsulates readers and messages cache - The ReadersWrapper class manages multiple rosbag2_cpp::Reader instances, allowing for chronological reading of messages across all readers. It maintains a cache of the next message from each reader and provides methods to retrieve the next message in chronological order, seek to a specific timestamp across all readers, and apply filters to all readers. It also provides access to the earliest and latest timestamps across all readers. Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Add unit tests for ReadersWrapper class Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Delete default ctor and copy/move operators for ReadersWrapper class Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Rename ReadersWrapper class to the ReadersManager Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Rename get_next_chronological_message_from_cache - Reasoning: Not mentioning cache in the name because this is some implementation details and could be changed in the future. However, the API needs to be stable and clear. - Also added a comment about reasoning for usage pre-cache inside implementation file. Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Rename "no_messages_in_cache" to the "has_next()" Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Renames for mutex and condition_variable - Rename "skip_message_in_main_play_loop_mutex_" to the "main_play_loop_mutex_" and "playback_finished_cv_" into the "is_in_playback_cv_". Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Fix for messages_queue access race in seek(time) - Stop storage loading thread before doing seek to avoid race condition - Return from seek at the beginning if player is not in playback mode. Reasoning: We will seek to the beginning of the bag when will start play() anyway. Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Speedup "load_storage_content()" interruption - Check "load_storage_content_" inside "enqueue_up_to_boundary()" Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Fix for play_fails_gracefully_if_needed_coverter_plugin_does_not_exist Add missing converter options when open readers Signed-off-by: Michael Orlov <michael.orlov@apex.ai> --------- Signed-off-by: Michael Orlov <morlovmr@gmail.com> Signed-off-by: Michael Orlov <michael.orlov@apex.ai> (cherry picked from commit bfce7ac) # Conflicts: # rosbag2_transport/src/rosbag2_transport/player.cpp # rosbag2_transport/test/rosbag2_transport/test_play.cpp
1 parent 7d66049 commit 86b88b4

File tree

8 files changed

+1015
-113
lines changed

8 files changed

+1015
-113
lines changed

rosbag2_transport/CMakeLists.txt

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ add_library(${PROJECT_NAME} SHARED
4949
src/rosbag2_transport/play_options.cpp
5050
src/rosbag2_transport/player_service_client.cpp
5151
src/rosbag2_transport/reader_writer_factory.cpp
52+
src/rosbag2_transport/readers_manager.cpp
5253
src/rosbag2_transport/recorder.cpp
5354
src/rosbag2_transport/record_options.cpp
5455
src/rosbag2_transport/topic_filter.cpp
@@ -127,7 +128,12 @@ if(BUILD_TESTING)
127128
ament_find_gmock()
128129

129130
ament_add_gmock_executable(test_play test/rosbag2_transport/test_play.cpp)
130-
target_link_libraries(test_play rosbag2_transport ${test_msgs_TARGETS} rosbag2_test_common::rosbag2_test_common)
131+
target_link_libraries(test_play
132+
rosbag2_transport
133+
${test_msgs_TARGETS}
134+
rosbag2_test_common::rosbag2_test_common
135+
rcutils::rcutils
136+
)
131137
function(test_play_for_rmw_implementation)
132138
set(rmw_implementation_env_var RMW_IMPLEMENTATION=${rmw_implementation})
133139

@@ -543,6 +549,13 @@ if(BUILD_TESTING)
543549
rosbag2_test_common::rosbag2_test_common
544550
${rosbag2_interfaces_TARGETS}
545551
)
552+
553+
ament_add_gmock(test_readers_manager test/rosbag2_transport/test_readers_manager.cpp)
554+
target_link_libraries(test_readers_manager
555+
${PROJECT_NAME}
556+
rosbag2_test_common::rosbag2_test_common
557+
rcutils::rcutils
558+
)
546559
endif()
547560

548561
ament_package()
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright 2025 Apex.AI, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef ROSBAG2_TRANSPORT__READERS_MANAGER_HPP_
16+
#define ROSBAG2_TRANSPORT__READERS_MANAGER_HPP_
17+
18+
#include <memory>
19+
#include <utility>
20+
#include <vector>
21+
22+
#include "rcutils/time.h"
23+
#include "rosbag2_cpp/bag_events.hpp"
24+
#include "rosbag2_cpp/reader.hpp"
25+
#include "rosbag2_storage/serialized_bag_message.hpp"
26+
#include "rosbag2_storage/storage_options.hpp"
27+
#include "rosbag2_transport/visibility_control.hpp"
28+
29+
#ifdef _WIN32
30+
# pragma warning(push)
31+
// Suppress warning "rosbag2_transport::ReadersManager::pimpl_': class 'std::unique_ptr>'
32+
// needs to have dll-interface to be used by clients of class 'rosbag2_transport::ReadersManager'"
33+
// Justification:
34+
// 1. We never inline code in the header that actually calls methods on ReadersManagerImpl.
35+
// 2. While the `ReadersManagerImpl` is defined in the `readers_manager_impl.hpp`
36+
// file, we include it only in the `readers_manager.cpp` file, and it does not leak into the
37+
// external API.
38+
// 3. The pimpl design pattern imply that implementation details are hidden and shouldn't be
39+
// exposed with the dll-interface.
40+
# pragma warning(disable:4251)
41+
#endif
42+
43+
44+
namespace rosbag2_transport
45+
{
46+
class ReadersManagerImpl;
47+
48+
/// \class ReadersManager
49+
/// \brief This class manages multiple rosbag2_cpp::Reader instances, allowing for
50+
/// chronological reading of messages across all readers.
51+
/// \details It maintains a cache of the next message from each reader and provides methods to
52+
/// retrieve the next message in chronological order, seek to a specific timestamp across all
53+
/// readers, and apply filters to all readers. It also provides access to the earliest and
54+
/// latest timestamps across all readers.
55+
class ROSBAG2_TRANSPORT_PUBLIC ReadersManager
56+
{
57+
public:
58+
using reader_storage_options_pair_t =
59+
std::pair<std::unique_ptr<rosbag2_cpp::Reader>, rosbag2_storage::StorageOptions>;
60+
61+
/// \brief Constructor which initializes the ReadersManager with multiple readers and their
62+
/// associated storage options.
63+
/// \note The readers will be opened during construction and cache will be populated with the
64+
/// first message from each reader (if available).
65+
/// \param reader_with_options Vector of pairs of unique pointer to the rosbag2_cpp::Reader class
66+
/// (which will be moved to the internal instance of the ReadersManager class during construction)
67+
/// and storage options (which will be applied to the rosbag2_cpp::reader when opening it).
68+
explicit ReadersManager(std::vector<reader_storage_options_pair_t> && reader_with_options);
69+
70+
/// \brief Deleted default constructor and copy/move operations.
71+
ReadersManager() = delete;
72+
ReadersManager(const ReadersManager &) = delete;
73+
ReadersManager & operator=(const ReadersManager &) = delete;
74+
ReadersManager(ReadersManager &&) = delete;
75+
ReadersManager & operator=(ReadersManager &&) = delete;
76+
77+
/// \brief Destructor which cleans up resources used by the ReadersManager.
78+
/// \note The readers will be closed during destruction.
79+
virtual ~ReadersManager();
80+
81+
/// \brief Getter for the currently stored storage options
82+
/// \return Copy of the currently stored storage options
83+
[[nodiscard]] std::vector<rosbag2_storage::StorageOptions> get_all_storage_options() const;
84+
85+
/// \brief Check if there are present some messages to take.
86+
/// \details Indicates that all readers have been exhausted.
87+
/// i.e. there are no more messages to take from readers.
88+
/// \return true if there are more messages to take, false otherwise.
89+
[[nodiscard]] bool has_next() const;
90+
91+
/// \brief Get the next message with the earliest recv_timestamp. Updates the cache by
92+
/// reading from readers as necessary.
93+
[[nodiscard]] std::shared_ptr<rosbag2_storage::SerializedBagMessage>
94+
get_next_message_in_chronological_order();
95+
96+
/// \brief Seek all readers to the provided timestamp.
97+
/// \details seek(t) will cause subsequent reads from readers to return messages that satisfy
98+
/// timestamp >= time t.
99+
/// \param timestamp The timestamp to seek to.
100+
void seek(const rcutils_time_point_value_t & timestamp);
101+
102+
/// \brief Getter method for the earliest time among all readers.
103+
/// \return Returns timestamp of the first message in nanoseconds.
104+
[[nodiscard]] rcutils_time_point_value_t get_earliest_timestamp() const;
105+
106+
/// \brief Getter method for te latest time among all readers.
107+
/// \return Returns timestamp of the last message in nanoseconds.
108+
[[nodiscard]] rcutils_time_point_value_t get_latest_timestamp() const;
109+
110+
/// \brief Apply a filter to all readers.
111+
/// \param storage_filter The filter to apply.
112+
void set_filter(const rosbag2_storage::StorageFilter & storage_filter);
113+
114+
/// \brief Getter for all topics and types in all readers.
115+
/// \return vector of topics with topic name and type as std::string
116+
[[nodiscard]] std::vector<rosbag2_storage::TopicMetadata> get_all_topics_and_types() const;
117+
118+
/// \brief Add event callbacks to all readers.
119+
/// \param callbacks The callbacks to add.
120+
void add_event_callbacks(rosbag2_cpp::bag_events::ReaderEventCallbacks & callbacks);
121+
122+
private:
123+
std::unique_ptr<ReadersManagerImpl> pimpl_;
124+
};
125+
126+
} // namespace rosbag2_transport
127+
128+
#ifdef _WIN32
129+
# pragma warning(pop)
130+
#endif
131+
132+
#endif // ROSBAG2_TRANSPORT__READERS_MANAGER_HPP_

0 commit comments

Comments
 (0)