Skip to content

Commit 48fa0bc

Browse files
MichaelOrlovmorlov-apexai
authored andcommitted
Fix multiple race conditions in the play_messages_from_queue()
- Got rid the extra while loop in the "play_messages_from_queue()". Reasoning: The logic still will be correct. After exiting from the "play_messages_from_queue()" we will check if need to repeat from beginning and restart "play_messages_from_queue()" again after init. All other calls like play_next() and seek() will preserve their logic and will work the same way as before. - Fix for race condition with seek() at the end of the playback Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
1 parent e368b37 commit 48fa0bc

File tree

1 file changed

+90
-71
lines changed

1 file changed

+90
-71
lines changed

rosbag2_transport/src/rosbag2_transport/player.cpp

Lines changed: 90 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ bool PlayerImpl::play()
683683
is_in_playback_cv_.notify_all();
684684

685685
// If we get here and still have/just got a play next request, make sure to notify play_next()
686-
// After that, requests will be automatically rejected since is_in_playback_ is false
686+
// After that, requests will be automatically rejected since is_in_playback_ become false
687687
if (play_next_.exchange(false)) {
688688
std::lock_guard<std::mutex> lk(finished_play_next_mutex_);
689689
finished_play_next_ = true;
@@ -1176,14 +1176,37 @@ void PlayerImpl::play_messages_from_queue()
11761176
ready_to_play_from_queue_cv_.notify_all();
11771177

11781178
// While we haven't stopped playing, try to play messages and wait for a potential request to play
1179-
// the next message
1179+
// the next message.
1180+
std::unique_lock<std::mutex> main_play_loop_lk(main_play_loop_mutex_);
11801181
while (rclcpp::ok() && !stop_playback_) {
1181-
// While there's a message to play and we haven't reached the end timestamp yet
1182-
while (rclcpp::ok() && !stop_playback_ &&
1183-
message_ptr != nullptr && !shall_stop_at_timestamp(get_message_order_timestamp(message_ptr)))
1184-
{
1182+
main_play_loop_lk.unlock(); // Unlock while we are waiting/sleeping
1183+
if (is_paused()) {
1184+
// If we're paused, we need to wait until the stop of playback or until we get a play next.
1185+
// Also while we're in a pause state, make sure we don't return if we happen to be at the end
1186+
// of the queue. We need to be able to make seek back in time from the end of the queue
1187+
// while in pause.
1188+
while (rclcpp::ok() && is_paused() && !stop_playback_ && !play_next_.load()) {
1189+
// When clock is in pause mode, sleep_until() will be blocked until we wake it up or
1190+
// change the clock state from pause to resume, so this is not a tight busy loop on pause.
1191+
(void)clock_->sleep_until(clock_->now());
1192+
// Stop sleeping if cancelled
1193+
if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) {
1194+
break;
1195+
}
1196+
}
1197+
if (!is_paused()) {
1198+
main_play_loop_lk.lock();
1199+
continue; // If we just exited from the pause mode, go back to the top of the loop to
1200+
// handle the non-paused case
1201+
}
1202+
} else {
1203+
// While there's a message to play, and we haven't reached the end yet.
1204+
if (message_ptr == nullptr ||
1205+
shall_stop_at_timestamp(get_message_order_timestamp(message_ptr)))
1206+
{
1207+
break;
1208+
}
11851209
// Sleep until the message's replay time, do not move on until sleep_until returns true
1186-
// It will always sleep, so this is not a tight busy loop on pause
11871210
// However, skip sleeping if we're trying to play the next message
11881211
while (rclcpp::ok() && !stop_playback_ && !play_next_.load() &&
11891212
!clock_->sleep_until(get_message_order_timestamp(message_ptr)))
@@ -1193,83 +1216,79 @@ void PlayerImpl::play_messages_from_queue()
11931216
break;
11941217
}
11951218
}
1219+
}
11961220

1197-
std::lock_guard<std::mutex> main_play_loop_lk(main_play_loop_mutex_);
1198-
if (!stop_playback_) {
1199-
// This means that the message we took from the queue's top was invalidated after a seek(),
1200-
// so we need to take a fresh element from the top of the queue, unless we have to stop
1201-
if (skip_message_in_main_play_loop_) {
1202-
skip_message_in_main_play_loop_ = false;
1203-
cancel_wait_for_next_message_ = false;
1204-
message_ptr = take_next_message_from_queue();
1205-
continue;
1206-
}
1221+
main_play_loop_lk.lock();
1222+
if (!stop_playback_) {
1223+
// This means that the message we took from the queue's top was invalidated after a seek(),
1224+
// so we need to take a fresh element from the top of the queue, unless we have to stop
1225+
if (skip_message_in_main_play_loop_) {
1226+
skip_message_in_main_play_loop_ = false;
1227+
cancel_wait_for_next_message_ = false;
1228+
message_ptr = take_next_message_from_queue();
1229+
continue;
1230+
}
12071231

1208-
const bool message_published = publish_message(message_ptr);
1209-
// If we tried to publish because of play_next(), jump the clock
1232+
if (message_ptr == nullptr) {
1233+
// We can get here with a null message_ptr if we are processing play_next() but there are
1234+
// actually no more messages to play. In that case, we just notify play_next(). Or if we
1235+
// have reached the end of playback and resumed playback from pause mode. In this case,
1236+
// we need to continue loop to gracefully exit from play_messages_from_queue()
12101237
if (play_next_.load()) {
1211-
clock_->jump(get_message_order_timestamp(message_ptr));
12121238
play_next_ = false;
12131239
std::lock_guard<std::mutex> lk(finished_play_next_mutex_);
12141240
finished_play_next_ = true;
1215-
play_next_result_ = message_published;
1241+
play_next_result_ = false;
12161242
finished_play_next_cv_.notify_all();
12171243
}
1218-
// Updating progress bar in this code section protected
1219-
// by the mutex main_play_loop_mutex_.
1220-
const auto current_player_status = progress_bar_->get_player_status();
1221-
switch (current_player_status) {
1222-
case PlayerStatus::PAUSED:
1223-
// Update progress bar without delays for each explicit play_next() call
1224-
progress_bar_->update(PlayerStatus::PAUSED, get_message_order_timestamp(message_ptr));
1225-
break;
1226-
case PlayerStatus::BURST:
1227-
// Limit progress bar update in burst mode
1228-
progress_bar_->update_with_limited_rate(
1229-
PlayerStatus::BURST, get_message_order_timestamp(message_ptr));
1230-
break;
1231-
default:
1232-
progress_bar_->update_with_limited_rate(
1233-
PlayerStatus::RUNNING, get_message_order_timestamp(message_ptr));
1234-
break;
1235-
}
1244+
continue;
12361245
}
1237-
message_ptr = take_next_message_from_queue();
1238-
}
12391246

1240-
// At this point, we're at the end of the playback round, there are no more messages to play
1241-
// If we're still trying to play next or just got a request, we did not succeed
1242-
if (play_next_.exchange(false)) {
1243-
std::lock_guard<std::mutex> lk(finished_play_next_mutex_);
1244-
finished_play_next_ = true;
1245-
play_next_result_ = false;
1246-
finished_play_next_cv_.notify_all();
1247-
}
1248-
1249-
// While we're in a pause state, make sure we don't return if we happen to be at the end of the
1250-
// queue or playback round. However, if we get a request for play next during sleep_until(...),
1251-
// we need to stop waiting here and proceed to handle the play next request by doing another
1252-
// loop of this while().
1253-
while (!stop_playback_ && is_paused() && !play_next_.load() && rclcpp::ok()) {
1254-
clock_->sleep_until(clock_->now());
1255-
}
1256-
// If we ran out of messages and are not in pause state, it means we're done playing
1257-
// TODO(morlov): We could have executed a seek() to a time point beyond the last message
1258-
// timestamp, while we were waiting in pause in that case we should not stop playing.
1259-
if (!is_paused()) {
1260-
break;
1261-
}
1247+
bool message_published = false;
1248+
// We shall respect shall_stop_at_timestamp() even do play_next in pause mode
1249+
if (!shall_stop_at_timestamp(get_message_order_timestamp(message_ptr))) {
1250+
message_published = publish_message(message_ptr);
1251+
}
12621252

1263-
// If we had run out of messages before but are starting to play next again, e.g., after a
1264-
// seek(), we need to take a new message from the queue
1265-
if (play_next_.load()) {
1266-
// lock the main_play_loop_mutex_ to avoid race with seek()
1267-
std::lock_guard<std::mutex> main_play_loop_lk(main_play_loop_mutex_);
1268-
skip_message_in_main_play_loop_ = false;
1269-
cancel_wait_for_next_message_ = false;
1270-
message_ptr = take_next_message_from_queue();
1253+
// If we tried to publish because of play_next(), jump the clock
1254+
if (play_next_.load()) {
1255+
clock_->jump(get_message_order_timestamp(message_ptr));
1256+
play_next_ = false;
1257+
std::lock_guard<std::mutex> lk(finished_play_next_mutex_);
1258+
finished_play_next_ = true;
1259+
play_next_result_ = message_published;
1260+
finished_play_next_cv_.notify_all();
1261+
}
1262+
// Updating progress bar in this code section protected
1263+
// by the mutex main_play_loop_mutex_.
1264+
const auto current_player_status = progress_bar_->get_player_status();
1265+
switch (current_player_status) {
1266+
case PlayerStatus::PAUSED:
1267+
// Update progress bar without delays for each explicit play_next() call
1268+
progress_bar_->update(PlayerStatus::PAUSED, get_message_order_timestamp(message_ptr));
1269+
break;
1270+
case PlayerStatus::BURST:
1271+
// Limit progress bar update in burst mode
1272+
progress_bar_->update_with_limited_rate(
1273+
PlayerStatus::BURST, get_message_order_timestamp(message_ptr));
1274+
break;
1275+
default:
1276+
progress_bar_->update_with_limited_rate(
1277+
PlayerStatus::RUNNING, get_message_order_timestamp(message_ptr));
1278+
break;
1279+
}
12711280
}
1281+
message_ptr = take_next_message_from_queue();
1282+
}
1283+
// Note: We are still under the lock of main_play_loop_mutex_ here
1284+
{ // Notify seek() that we are NOT ready for playback anymore. The seek() will wait until
1285+
// play_messages_from_queue() will be called again in the next iteration of the play loop
1286+
// in Player::play(). This is to avoid race condition between seek() and
1287+
// play_messages_from_queue() when play_options_.loop = true
1288+
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
1289+
is_ready_to_play_from_queue_ = false;
12721290
}
1291+
ready_to_play_from_queue_cv_.notify_all();
12731292
}
12741293

12751294
rcutils_time_point_value_t PlayerImpl::get_message_order_timestamp(

0 commit comments

Comments
 (0)