Skip to content

Conversation

@MichaelOrlov
Copy link
Contributor

@MichaelOrlov MichaelOrlov commented Sep 1, 2025

Description

This PR fixes a problem for slow playback when one reader has a lower frequency topic than others

Is this user-facing behavior change?

Fixed a bug where playback of the higher frequency topics can slow down when present slow frequency topics in another playing bag.

Did you use Generative AI?

Yes. I am using GitHub Copilot, GPT-4.1, in my workflow. in particular to help with unit tests and documentation.

Additional Information

This PR can be backported to the other distros.

@MichaelOrlov MichaelOrlov marked this pull request as ready for review September 1, 2025 18:25
@MichaelOrlov MichaelOrlov force-pushed the morlov/fix-for-multibag-replay-stagnation branch from 9694f3b to 8089aab Compare September 2, 2025 14:52
@MichaelOrlov
Copy link
Contributor Author

@ros-pull-request-builder retest this please

@MichaelOrlov
Copy link
Contributor Author

Pulls: #2158
Gist: https://gist.githubusercontent.com/MichaelOrlov/15d3a9082ed76aa64911d78bc47d74f7/raw/bc34f98f1ab92956f6be1d973f053311a2f6ec79/ros2.repos
BUILD args: --packages-above-and-dependencies rosbag2_transport rosbag2_tests
TEST args: --packages-above rosbag2_transport rosbag2_tests
ROS Distro: rolling
Job: ci_launcher
ci_launcher ran: https://ci.ros2.org/job/ci_launcher/16834

  • Linux Build Status
  • Linux-aarch64 Build Status
  • Linux-rhel Build Status
  • Windows Build Status

Copy link
Contributor

@fujitatomoya fujitatomoya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm with green CI.

@MichaelOrlov
Copy link
Contributor Author

Re-run CI after renames
ci_launcher ran: https://ci.ros2.org/job/ci_launcher/16837

  • Linux Build Status
  • Linux-aarch64 Build Status
  • Linux-rhel Build Status
  • Windows Build Status

@MichaelOrlov
Copy link
Contributor Author

Hold off from merging this PR. We have regression in the tests under the load

/tmp/ws/src/rosbag2/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp:144: Failure
Expected equality of these values:
  replayed_topic1[2]->int32_value
    Which is: 4
  1

/tmp/ws/src/rosbag2/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp:145: Failure
Expected equality of these values:
  replayed_topic1[3]->int32_value
    Which is: 1
  2

Preliminary analysis

That is because a data race for the internal queue cleanup and refill.
Before, the queue cleanup and refill was protected by the readers_mutex; now I moved those mutex inside ReadersManager, and when we are doing seek() we need to do queue->purge() and readers->seek(timestamp). However, another thread still trying to refill the queue from the readers in the background.

@MichaelOrlov MichaelOrlov force-pushed the morlov/fix-for-multibag-replay-stagnation branch 2 times, most recently from a5928fa to ed2b385 Compare September 5, 2025 02:10
Signed-off-by: Michael Orlov <morlovmr@gmail.com>
- Add cache for the last element from each reader and taking messages
 from cache in a chronological order.
- Also use "rcpputils::unique_lock" consistently for the reader_mutex_

Signed-off-by: Michael Orlov <morlovmr@gmail.com>
- The ReadersWrapper class manages multiple rosbag2_cpp::Reader
 instances, allowing for chronological reading of messages across all
 readers. It maintains a cache of the next message from each reader and
 provides methods to retrieve the next message in chronological order,
 seek to a specific timestamp across all readers, and apply filters to
 all readers. It also provides access to the earliest and latest
 timestamps across all readers.

Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
- Reasoning: Not mentioning cache in the name because this is some
 implementation details and could be changed in the future. However,
 the API needs to be stable and clear.
- Also added a comment about reasoning for usage pre-cache inside
 implementation file.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
- Rename "skip_message_in_main_play_loop_mutex_" to the
 "main_play_loop_mutex_" and "playback_finished_cv_" into the
 "is_in_playback_cv_".

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
@MichaelOrlov MichaelOrlov force-pushed the morlov/fix-for-multibag-replay-stagnation branch from 43a6c87 to 48fa0bc Compare September 5, 2025 15:33
- Stop storage loading thread before doing seek to avoid race condition
- Return from seek at the beginning if player is not in playback mode.
  Reasoning: We will seek to the beginning of the bag when will start
  play() anyway.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
- Check "load_storage_content_" inside "enqueue_up_to_boundary()"

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Add missing converter options when open readers

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
@MichaelOrlov MichaelOrlov force-pushed the morlov/fix-for-multibag-replay-stagnation branch from 48fa0bc to 3560446 Compare September 5, 2025 16:24
@MichaelOrlov
Copy link
Contributor Author

Note: Decided to move a couple of commits

@MichaelOrlov
Copy link
Contributor Author

Re-run CI
ci_launcher ran: https://ci.ros2.org/job/ci_launcher/16885

  • Linux Build Status
  • Linux-aarch64 Build Status
  • Linux-rhel Build Status
  • Windows Build Status

@MichaelOrlov
Copy link
Contributor Author

The Linux-RHEL CI job failed due to not found Uncrustify.
Trying to re-run this job

  • Linux-rhel Build Status

@MichaelOrlov
Copy link
Contributor Author

@mjcarroll @ahcorde Any ideas for why RHEL CI job keep fails?
Something is wrong with uncrustify.
I am curious if it is somehow related to the recent removal of the uncrustify_vendor package.

@MichaelOrlov
Copy link
Contributor Author

Pulls: #2158
Gist: https://gist.githubusercontent.com/MichaelOrlov/2ba68fbf0fbdc4a9da5643bfca61fc85/raw/bc34f98f1ab92956f6be1d973f053311a2f6ec79/ros2.repos
BUILD args: --packages-above-and-dependencies rosbag2_transport rosbag2_tests
TEST args: --packages-above rosbag2_transport rosbag2_tests
ROS Distro: rolling
Job: ci_launcher
ci_launcher ran: https://ci.ros2.org/job/ci_launcher/16939

  • Linux Build Status
  • Linux-aarch64 Build Status
  • Linux-rhel Build Status
  • Windows Build Status

@MichaelOrlov
Copy link
Contributor Author

The RHEL CI job failed due to the CI infrastructure issue. It was auto-restarted and passed green

  • Linux-rhel Build Status

@MichaelOrlov
Copy link
Contributor Author

The CI is green after reverting the uncrustify_vendor removal. Merging then.

@MichaelOrlov MichaelOrlov merged commit bfce7ac into rolling Sep 12, 2025
12 checks passed
@MichaelOrlov MichaelOrlov deleted the morlov/fix-for-multibag-replay-stagnation branch September 12, 2025 19:18
@MichaelOrlov
Copy link
Contributor Author

https://github.com/Mergifyio backport kilted jazzy

@mergify
Copy link

mergify bot commented Sep 12, 2025

backport kilted jazzy

✅ Backports have been created

mergify bot pushed a commit that referenced this pull request Sep 12, 2025
* Add "high_freq_topics_does_not_starve_in_multibag_playback" test

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Quick fix for slow playback when one reader has lower freq topic

- Add cache for the last element from each reader and taking messages
 from cache in a chronological order.
- Also use "rcpputils::unique_lock" consistently for the reader_mutex_

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Add ReadersWrapper class that encapsulates readers and messages cache

- The ReadersWrapper class manages multiple rosbag2_cpp::Reader
 instances, allowing for chronological reading of messages across all
 readers. It maintains a cache of the next message from each reader and
 provides methods to retrieve the next message in chronological order,
 seek to a specific timestamp across all readers, and apply filters to
 all readers. It also provides access to the earliest and latest
 timestamps across all readers.

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Add unit tests for ReadersWrapper class

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Delete default ctor and copy/move operators for ReadersWrapper class

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Rename ReadersWrapper class to the ReadersManager

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Rename get_next_chronological_message_from_cache

- Reasoning: Not mentioning cache in the name because this is some
 implementation details and could be changed in the future. However,
 the API needs to be stable and clear.
- Also added a comment about reasoning for usage pre-cache inside
 implementation file.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Rename "no_messages_in_cache" to the "has_next()"

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Renames for mutex and condition_variable

- Rename "skip_message_in_main_play_loop_mutex_" to the
 "main_play_loop_mutex_" and "playback_finished_cv_" into the
 "is_in_playback_cv_".

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Fix for messages_queue access race in seek(time)

- Stop storage loading thread before doing seek to avoid race condition
- Return from seek at the beginning if player is not in playback mode.
  Reasoning: We will seek to the beginning of the bag when will start
  play() anyway.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Speedup "load_storage_content()" interruption

- Check "load_storage_content_" inside "enqueue_up_to_boundary()"

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Fix for play_fails_gracefully_if_needed_coverter_plugin_does_not_exist

Add missing converter options when open readers

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
(cherry picked from commit bfce7ac)
mergify bot pushed a commit that referenced this pull request Sep 12, 2025
* Add "high_freq_topics_does_not_starve_in_multibag_playback" test

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Quick fix for slow playback when one reader has lower freq topic

- Add cache for the last element from each reader and taking messages
 from cache in a chronological order.
- Also use "rcpputils::unique_lock" consistently for the reader_mutex_

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Add ReadersWrapper class that encapsulates readers and messages cache

- The ReadersWrapper class manages multiple rosbag2_cpp::Reader
 instances, allowing for chronological reading of messages across all
 readers. It maintains a cache of the next message from each reader and
 provides methods to retrieve the next message in chronological order,
 seek to a specific timestamp across all readers, and apply filters to
 all readers. It also provides access to the earliest and latest
 timestamps across all readers.

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Add unit tests for ReadersWrapper class

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Delete default ctor and copy/move operators for ReadersWrapper class

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Rename ReadersWrapper class to the ReadersManager

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Rename get_next_chronological_message_from_cache

- Reasoning: Not mentioning cache in the name because this is some
 implementation details and could be changed in the future. However,
 the API needs to be stable and clear.
- Also added a comment about reasoning for usage pre-cache inside
 implementation file.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Rename "no_messages_in_cache" to the "has_next()"

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Renames for mutex and condition_variable

- Rename "skip_message_in_main_play_loop_mutex_" to the
 "main_play_loop_mutex_" and "playback_finished_cv_" into the
 "is_in_playback_cv_".

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Fix for messages_queue access race in seek(time)

- Stop storage loading thread before doing seek to avoid race condition
- Return from seek at the beginning if player is not in playback mode.
  Reasoning: We will seek to the beginning of the bag when will start
  play() anyway.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Speedup "load_storage_content()" interruption

- Check "load_storage_content_" inside "enqueue_up_to_boundary()"

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Fix for play_fails_gracefully_if_needed_coverter_plugin_does_not_exist

Add missing converter options when open readers

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
(cherry picked from commit bfce7ac)

# Conflicts:
#	rosbag2_transport/src/rosbag2_transport/player.cpp
#	rosbag2_transport/test/rosbag2_transport/test_play.cpp
MichaelOrlov added a commit that referenced this pull request Sep 13, 2025
* Add "high_freq_topics_does_not_starve_in_multibag_playback" test



* Quick fix for slow playback when one reader has lower freq topic

- Add cache for the last element from each reader and taking messages
 from cache in a chronological order.
- Also use "rcpputils::unique_lock" consistently for the reader_mutex_



* Add ReadersWrapper class that encapsulates readers and messages cache

- The ReadersWrapper class manages multiple rosbag2_cpp::Reader
 instances, allowing for chronological reading of messages across all
 readers. It maintains a cache of the next message from each reader and
 provides methods to retrieve the next message in chronological order,
 seek to a specific timestamp across all readers, and apply filters to
 all readers. It also provides access to the earliest and latest
 timestamps across all readers.



* Add unit tests for ReadersWrapper class



* Delete default ctor and copy/move operators for ReadersWrapper class



* Rename ReadersWrapper class to the ReadersManager



* Rename get_next_chronological_message_from_cache

- Reasoning: Not mentioning cache in the name because this is some
 implementation details and could be changed in the future. However,
 the API needs to be stable and clear.
- Also added a comment about reasoning for usage pre-cache inside
 implementation file.



* Rename "no_messages_in_cache" to the "has_next()"



* Renames for mutex and condition_variable

- Rename "skip_message_in_main_play_loop_mutex_" to the
 "main_play_loop_mutex_" and "playback_finished_cv_" into the
 "is_in_playback_cv_".



* Fix for messages_queue access race in seek(time)

- Stop storage loading thread before doing seek to avoid race condition
- Return from seek at the beginning if player is not in playback mode.
  Reasoning: We will seek to the beginning of the bag when will start
  play() anyway.



* Speedup "load_storage_content()" interruption

- Check "load_storage_content_" inside "enqueue_up_to_boundary()"



* Fix for play_fails_gracefully_if_needed_coverter_plugin_does_not_exist

Add missing converter options when open readers



---------



(cherry picked from commit bfce7ac)

Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Co-authored-by: Michael Orlov <morlovmr@gmail.com>
MichaelOrlov added a commit that referenced this pull request Sep 19, 2025
* Fix for multibag replay stagnation (#2158)

* Add "high_freq_topics_does_not_starve_in_multibag_playback" test

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Quick fix for slow playback when one reader has lower freq topic

- Add cache for the last element from each reader and taking messages
 from cache in a chronological order.
- Also use "rcpputils::unique_lock" consistently for the reader_mutex_

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Add ReadersWrapper class that encapsulates readers and messages cache

- The ReadersWrapper class manages multiple rosbag2_cpp::Reader
 instances, allowing for chronological reading of messages across all
 readers. It maintains a cache of the next message from each reader and
 provides methods to retrieve the next message in chronological order,
 seek to a specific timestamp across all readers, and apply filters to
 all readers. It also provides access to the earliest and latest
 timestamps across all readers.

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Add unit tests for ReadersWrapper class

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Delete default ctor and copy/move operators for ReadersWrapper class

Signed-off-by: Michael Orlov <morlovmr@gmail.com>

* Rename ReadersWrapper class to the ReadersManager

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Rename get_next_chronological_message_from_cache

- Reasoning: Not mentioning cache in the name because this is some
 implementation details and could be changed in the future. However,
 the API needs to be stable and clear.
- Also added a comment about reasoning for usage pre-cache inside
 implementation file.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Rename "no_messages_in_cache" to the "has_next()"

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Renames for mutex and condition_variable

- Rename "skip_message_in_main_play_loop_mutex_" to the
 "main_play_loop_mutex_" and "playback_finished_cv_" into the
 "is_in_playback_cv_".

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Fix for messages_queue access race in seek(time)

- Stop storage loading thread before doing seek to avoid race condition
- Return from seek at the beginning if player is not in playback mode.
  Reasoning: We will seek to the beginning of the bag when will start
  play() anyway.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Speedup "load_storage_content()" interruption

- Check "load_storage_content_" inside "enqueue_up_to_boundary()"

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Fix for play_fails_gracefully_if_needed_coverter_plugin_does_not_exist

Add missing converter options when open readers

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
(cherry picked from commit bfce7ac)

# Conflicts:
#	rosbag2_transport/src/rosbag2_transport/player.cpp
#	rosbag2_transport/test/rosbag2_transport/test_play.cpp

* Address merge conflicts

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Co-authored-by: Michael Orlov <morlovmr@gmail.com>
MichaelOrlov added a commit that referenced this pull request Sep 20, 2025
- Return player storage options by value in the 'get_storage_options()'
 to avoid returning dangling reference.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
MichaelOrlov added a commit that referenced this pull request Sep 20, 2025
- Return player storage options by value in the 'get_storage_options()'
 to avoid returning dangling reference.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
MichaelOrlov added a commit that referenced this pull request Sep 20, 2025
- Fix for returning a dangling reference from the
 "Player::get_storage_options()" method
- Also deprecate "Player::get_storage_options()" method in favor of the
 "get_all_storage_options()""

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
MichaelOrlov added a commit that referenced this pull request Sep 22, 2025
* Follow-up on "Fix for multibag replay stagnation (#2158)"

- Return player storage options by value in the 'get_storage_options()'
 to avoid returning dangling reference.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Add TODO to not forget to remove deprecated method per review request

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
MichaelOrlov added a commit that referenced this pull request Sep 23, 2025
…2182)

* [kilted] Follow-up on "Fix for multibag replay stagnation (#2158)"

- Fix for returning a dangling reference from the
 "Player::get_storage_options()" method
- Also deprecate "Player::get_storage_options()" method in favor of the
 "get_all_storage_options()""

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Remove deprecation warning for the get_storage_options() per code review

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
mergify bot pushed a commit that referenced this pull request Sep 23, 2025
…2182)

* [kilted] Follow-up on "Fix for multibag replay stagnation (#2158)"

- Fix for returning a dangling reference from the
 "Player::get_storage_options()" method
- Also deprecate "Player::get_storage_options()" method in favor of the
 "get_all_storage_options()""

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Remove deprecation warning for the get_storage_options() per code review

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
(cherry picked from commit 19e0b90)

# Conflicts:
#	rosbag2_transport/src/rosbag2_transport/player.cpp
MichaelOrlov added a commit that referenced this pull request Sep 25, 2025
)

* [kilted] Follow-up on "Fix for multibag replay stagnation (#2158)"

- Fix for returning a dangling reference from the
 "Player::get_storage_options()" method
- Also deprecate "Player::get_storage_options()" method in favor of the
 "get_all_storage_options()""

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Remove deprecation warning for the get_storage_options() per code review

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
(cherry picked from commit 19e0b90)

# Conflicts:
#	rosbag2_transport/src/rosbag2_transport/player.cpp
MichaelOrlov added a commit that referenced this pull request Sep 25, 2025
…ckport #2182) (#2189)

* [jazzy] Follow-up on "Fix for multibag replay stagnation (#2158)" (#2182)

* [kilted] Follow-up on "Fix for multibag replay stagnation (#2158)"

- Fix for returning a dangling reference from the
 "Player::get_storage_options()" method
- Also deprecate "Player::get_storage_options()" method in favor of the
 "get_all_storage_options()""

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Remove deprecation warning for the get_storage_options() per code review

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
(cherry picked from commit 19e0b90)

# Conflicts:
#	rosbag2_transport/src/rosbag2_transport/player.cpp

* Address merge conflicts

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

When using multibag replay, the frequency of the messages on topics is reduced and inconsistent over the time

3 participants