-
Notifications
You must be signed in to change notification settings - Fork 288
Fix for multibag replay stagnation #2158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for multibag replay stagnation #2158
Conversation
9694f3b to
8089aab
Compare
|
@ros-pull-request-builder retest this please |
|
Pulls: #2158 |
fujitatomoya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm with green CI.
rosbag2_transport/include/rosbag2_transport/readers_wrapper.hpp
Outdated
Show resolved
Hide resolved
|
Re-run CI after renames |
|
Hold off from merging this PR. We have regression in the tests under the load /tmp/ws/src/rosbag2/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp:144: Failure
Expected equality of these values:
replayed_topic1[2]->int32_value
Which is: 4
1
/tmp/ws/src/rosbag2/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp:145: Failure
Expected equality of these values:
replayed_topic1[3]->int32_value
Which is: 1
2Preliminary analysisThat is because a data race for the internal queue cleanup and refill. |
a5928fa to
ed2b385
Compare
Signed-off-by: Michael Orlov <morlovmr@gmail.com>
- Add cache for the last element from each reader and taking messages from cache in a chronological order. - Also use "rcpputils::unique_lock" consistently for the reader_mutex_ Signed-off-by: Michael Orlov <morlovmr@gmail.com>
- The ReadersWrapper class manages multiple rosbag2_cpp::Reader instances, allowing for chronological reading of messages across all readers. It maintains a cache of the next message from each reader and provides methods to retrieve the next message in chronological order, seek to a specific timestamp across all readers, and apply filters to all readers. It also provides access to the earliest and latest timestamps across all readers. Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
- Reasoning: Not mentioning cache in the name because this is some implementation details and could be changed in the future. However, the API needs to be stable and clear. - Also added a comment about reasoning for usage pre-cache inside implementation file. Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
- Rename "skip_message_in_main_play_loop_mutex_" to the "main_play_loop_mutex_" and "playback_finished_cv_" into the "is_in_playback_cv_". Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
43a6c87 to
48fa0bc
Compare
- Stop storage loading thread before doing seek to avoid race condition - Return from seek at the beginning if player is not in playback mode. Reasoning: We will seek to the beginning of the bag when will start play() anyway. Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
- Check "load_storage_content_" inside "enqueue_up_to_boundary()" Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Add missing converter options when open readers Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
48fa0bc to
3560446
Compare
|
Note: Decided to move a couple of commits
|
|
Re-run CI |
|
@mjcarroll @ahcorde Any ideas for why RHEL CI job keep fails? |
|
Pulls: #2158 |
|
The CI is green after reverting the |
|
https://github.com/Mergifyio backport kilted jazzy |
✅ Backports have been created
|
* Add "high_freq_topics_does_not_starve_in_multibag_playback" test Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Quick fix for slow playback when one reader has lower freq topic - Add cache for the last element from each reader and taking messages from cache in a chronological order. - Also use "rcpputils::unique_lock" consistently for the reader_mutex_ Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Add ReadersWrapper class that encapsulates readers and messages cache - The ReadersWrapper class manages multiple rosbag2_cpp::Reader instances, allowing for chronological reading of messages across all readers. It maintains a cache of the next message from each reader and provides methods to retrieve the next message in chronological order, seek to a specific timestamp across all readers, and apply filters to all readers. It also provides access to the earliest and latest timestamps across all readers. Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Add unit tests for ReadersWrapper class Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Delete default ctor and copy/move operators for ReadersWrapper class Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Rename ReadersWrapper class to the ReadersManager Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Rename get_next_chronological_message_from_cache - Reasoning: Not mentioning cache in the name because this is some implementation details and could be changed in the future. However, the API needs to be stable and clear. - Also added a comment about reasoning for usage pre-cache inside implementation file. Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Rename "no_messages_in_cache" to the "has_next()" Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Renames for mutex and condition_variable - Rename "skip_message_in_main_play_loop_mutex_" to the "main_play_loop_mutex_" and "playback_finished_cv_" into the "is_in_playback_cv_". Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Fix for messages_queue access race in seek(time) - Stop storage loading thread before doing seek to avoid race condition - Return from seek at the beginning if player is not in playback mode. Reasoning: We will seek to the beginning of the bag when will start play() anyway. Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Speedup "load_storage_content()" interruption - Check "load_storage_content_" inside "enqueue_up_to_boundary()" Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Fix for play_fails_gracefully_if_needed_coverter_plugin_does_not_exist Add missing converter options when open readers Signed-off-by: Michael Orlov <michael.orlov@apex.ai> --------- Signed-off-by: Michael Orlov <morlovmr@gmail.com> Signed-off-by: Michael Orlov <michael.orlov@apex.ai> (cherry picked from commit bfce7ac)
* Add "high_freq_topics_does_not_starve_in_multibag_playback" test Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Quick fix for slow playback when one reader has lower freq topic - Add cache for the last element from each reader and taking messages from cache in a chronological order. - Also use "rcpputils::unique_lock" consistently for the reader_mutex_ Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Add ReadersWrapper class that encapsulates readers and messages cache - The ReadersWrapper class manages multiple rosbag2_cpp::Reader instances, allowing for chronological reading of messages across all readers. It maintains a cache of the next message from each reader and provides methods to retrieve the next message in chronological order, seek to a specific timestamp across all readers, and apply filters to all readers. It also provides access to the earliest and latest timestamps across all readers. Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Add unit tests for ReadersWrapper class Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Delete default ctor and copy/move operators for ReadersWrapper class Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Rename ReadersWrapper class to the ReadersManager Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Rename get_next_chronological_message_from_cache - Reasoning: Not mentioning cache in the name because this is some implementation details and could be changed in the future. However, the API needs to be stable and clear. - Also added a comment about reasoning for usage pre-cache inside implementation file. Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Rename "no_messages_in_cache" to the "has_next()" Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Renames for mutex and condition_variable - Rename "skip_message_in_main_play_loop_mutex_" to the "main_play_loop_mutex_" and "playback_finished_cv_" into the "is_in_playback_cv_". Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Fix for messages_queue access race in seek(time) - Stop storage loading thread before doing seek to avoid race condition - Return from seek at the beginning if player is not in playback mode. Reasoning: We will seek to the beginning of the bag when will start play() anyway. Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Speedup "load_storage_content()" interruption - Check "load_storage_content_" inside "enqueue_up_to_boundary()" Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Fix for play_fails_gracefully_if_needed_coverter_plugin_does_not_exist Add missing converter options when open readers Signed-off-by: Michael Orlov <michael.orlov@apex.ai> --------- Signed-off-by: Michael Orlov <morlovmr@gmail.com> Signed-off-by: Michael Orlov <michael.orlov@apex.ai> (cherry picked from commit bfce7ac) # Conflicts: # rosbag2_transport/src/rosbag2_transport/player.cpp # rosbag2_transport/test/rosbag2_transport/test_play.cpp
* Add "high_freq_topics_does_not_starve_in_multibag_playback" test * Quick fix for slow playback when one reader has lower freq topic - Add cache for the last element from each reader and taking messages from cache in a chronological order. - Also use "rcpputils::unique_lock" consistently for the reader_mutex_ * Add ReadersWrapper class that encapsulates readers and messages cache - The ReadersWrapper class manages multiple rosbag2_cpp::Reader instances, allowing for chronological reading of messages across all readers. It maintains a cache of the next message from each reader and provides methods to retrieve the next message in chronological order, seek to a specific timestamp across all readers, and apply filters to all readers. It also provides access to the earliest and latest timestamps across all readers. * Add unit tests for ReadersWrapper class * Delete default ctor and copy/move operators for ReadersWrapper class * Rename ReadersWrapper class to the ReadersManager * Rename get_next_chronological_message_from_cache - Reasoning: Not mentioning cache in the name because this is some implementation details and could be changed in the future. However, the API needs to be stable and clear. - Also added a comment about reasoning for usage pre-cache inside implementation file. * Rename "no_messages_in_cache" to the "has_next()" * Renames for mutex and condition_variable - Rename "skip_message_in_main_play_loop_mutex_" to the "main_play_loop_mutex_" and "playback_finished_cv_" into the "is_in_playback_cv_". * Fix for messages_queue access race in seek(time) - Stop storage loading thread before doing seek to avoid race condition - Return from seek at the beginning if player is not in playback mode. Reasoning: We will seek to the beginning of the bag when will start play() anyway. * Speedup "load_storage_content()" interruption - Check "load_storage_content_" inside "enqueue_up_to_boundary()" * Fix for play_fails_gracefully_if_needed_coverter_plugin_does_not_exist Add missing converter options when open readers --------- (cherry picked from commit bfce7ac) Signed-off-by: Michael Orlov <morlovmr@gmail.com> Signed-off-by: Michael Orlov <michael.orlov@apex.ai> Co-authored-by: Michael Orlov <morlovmr@gmail.com>
* Fix for multibag replay stagnation (#2158) * Add "high_freq_topics_does_not_starve_in_multibag_playback" test Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Quick fix for slow playback when one reader has lower freq topic - Add cache for the last element from each reader and taking messages from cache in a chronological order. - Also use "rcpputils::unique_lock" consistently for the reader_mutex_ Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Add ReadersWrapper class that encapsulates readers and messages cache - The ReadersWrapper class manages multiple rosbag2_cpp::Reader instances, allowing for chronological reading of messages across all readers. It maintains a cache of the next message from each reader and provides methods to retrieve the next message in chronological order, seek to a specific timestamp across all readers, and apply filters to all readers. It also provides access to the earliest and latest timestamps across all readers. Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Add unit tests for ReadersWrapper class Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Delete default ctor and copy/move operators for ReadersWrapper class Signed-off-by: Michael Orlov <morlovmr@gmail.com> * Rename ReadersWrapper class to the ReadersManager Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Rename get_next_chronological_message_from_cache - Reasoning: Not mentioning cache in the name because this is some implementation details and could be changed in the future. However, the API needs to be stable and clear. - Also added a comment about reasoning for usage pre-cache inside implementation file. Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Rename "no_messages_in_cache" to the "has_next()" Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Renames for mutex and condition_variable - Rename "skip_message_in_main_play_loop_mutex_" to the "main_play_loop_mutex_" and "playback_finished_cv_" into the "is_in_playback_cv_". Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Fix for messages_queue access race in seek(time) - Stop storage loading thread before doing seek to avoid race condition - Return from seek at the beginning if player is not in playback mode. Reasoning: We will seek to the beginning of the bag when will start play() anyway. Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Speedup "load_storage_content()" interruption - Check "load_storage_content_" inside "enqueue_up_to_boundary()" Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Fix for play_fails_gracefully_if_needed_coverter_plugin_does_not_exist Add missing converter options when open readers Signed-off-by: Michael Orlov <michael.orlov@apex.ai> --------- Signed-off-by: Michael Orlov <morlovmr@gmail.com> Signed-off-by: Michael Orlov <michael.orlov@apex.ai> (cherry picked from commit bfce7ac) # Conflicts: # rosbag2_transport/src/rosbag2_transport/player.cpp # rosbag2_transport/test/rosbag2_transport/test_play.cpp * Address merge conflicts Signed-off-by: Michael Orlov <michael.orlov@apex.ai> --------- Signed-off-by: Michael Orlov <michael.orlov@apex.ai> Co-authored-by: Michael Orlov <morlovmr@gmail.com>
- Return player storage options by value in the 'get_storage_options()' to avoid returning dangling reference. Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
- Return player storage options by value in the 'get_storage_options()' to avoid returning dangling reference. Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
- Fix for returning a dangling reference from the "Player::get_storage_options()" method - Also deprecate "Player::get_storage_options()" method in favor of the "get_all_storage_options()"" Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
* Follow-up on "Fix for multibag replay stagnation (#2158)" - Return player storage options by value in the 'get_storage_options()' to avoid returning dangling reference. Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Add TODO to not forget to remove deprecated method per review request Signed-off-by: Michael Orlov <michael.orlov@apex.ai> --------- Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
…2182) * [kilted] Follow-up on "Fix for multibag replay stagnation (#2158)" - Fix for returning a dangling reference from the "Player::get_storage_options()" method - Also deprecate "Player::get_storage_options()" method in favor of the "get_all_storage_options()"" Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Remove deprecation warning for the get_storage_options() per code review Signed-off-by: Michael Orlov <michael.orlov@apex.ai> --------- Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
…2182) * [kilted] Follow-up on "Fix for multibag replay stagnation (#2158)" - Fix for returning a dangling reference from the "Player::get_storage_options()" method - Also deprecate "Player::get_storage_options()" method in favor of the "get_all_storage_options()"" Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Remove deprecation warning for the get_storage_options() per code review Signed-off-by: Michael Orlov <michael.orlov@apex.ai> --------- Signed-off-by: Michael Orlov <michael.orlov@apex.ai> (cherry picked from commit 19e0b90) # Conflicts: # rosbag2_transport/src/rosbag2_transport/player.cpp
) * [kilted] Follow-up on "Fix for multibag replay stagnation (#2158)" - Fix for returning a dangling reference from the "Player::get_storage_options()" method - Also deprecate "Player::get_storage_options()" method in favor of the "get_all_storage_options()"" Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Remove deprecation warning for the get_storage_options() per code review Signed-off-by: Michael Orlov <michael.orlov@apex.ai> --------- Signed-off-by: Michael Orlov <michael.orlov@apex.ai> (cherry picked from commit 19e0b90) # Conflicts: # rosbag2_transport/src/rosbag2_transport/player.cpp
…ckport #2182) (#2189) * [jazzy] Follow-up on "Fix for multibag replay stagnation (#2158)" (#2182) * [kilted] Follow-up on "Fix for multibag replay stagnation (#2158)" - Fix for returning a dangling reference from the "Player::get_storage_options()" method - Also deprecate "Player::get_storage_options()" method in favor of the "get_all_storage_options()"" Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Remove deprecation warning for the get_storage_options() per code review Signed-off-by: Michael Orlov <michael.orlov@apex.ai> --------- Signed-off-by: Michael Orlov <michael.orlov@apex.ai> (cherry picked from commit 19e0b90) # Conflicts: # rosbag2_transport/src/rosbag2_transport/player.cpp * Address merge conflicts Signed-off-by: Michael Orlov <michael.orlov@apex.ai> --------- Signed-off-by: Michael Orlov <michael.orlov@apex.ai> Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Description
This PR fixes a problem for slow playback when one reader has a lower frequency topic than others
Added cache for the last element from each reader and taking messages from cache in a chronological order instead of the round-robin order.
To mitigate "spaggetish" code after the fix, this PR also introduces the
ReadersWrapperclass, which will encapsulate readers with their options and message cache.The
ReadersWrapperclass manages multiplerosbag2_cpp::Readerinstances, allowing for chronological reading of messages across all readers. It maintains a cache of the next message from each reader and provides methods to retrieve the next message in chronological order, seek to a specific timestamp across all readers, and apply filters to all readers. It also provides access to the earliest and latest timestamps across all readers.Fixes When using multibag replay, the frequency of the messages on topics is reduced and inconsistent over the time #2156
Depends on the Bugfix for incorrect MCAPStorage::seek(timestamp) when timestamp is current #2157
Is this user-facing behavior change?
Fixed a bug where playback of the higher frequency topics can slow down when present slow frequency topics in another playing bag.
Did you use Generative AI?
Yes. I am using GitHub Copilot, GPT-4.1, in my workflow. in particular to help with unit tests and documentation.
Additional Information
This PR can be backported to the other distros.