Skip to content

Commit f7185dc

Browse files
wjwwoodJanosch Machowinskijmachowinskifujitatomoya
authored
Fixup Executor::spin_all() regression fix (#2517)
* test(Executors): Added tests for busy waiting Checks if executors are busy waiting while they should block in spin_some or spin_all. Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com> * fix: Reworked spinAll test This test was strange. It looked like, it assumed that spin_all did not return instantly. Also it was racy, as the thread could terminate instantly. Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com> * fix(Executor): Fixed spin_all not returning instantly is no work was available Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com> * Update rclcpp/test/rclcpp/executors/test_executors.cpp Co-authored-by: Tomoya Fujita <Tomoya.Fujita@sony.com> Signed-off-by: jmachowinski <jmachowinski@users.noreply.github.com> * test(executors): Added test for busy waiting while calling spin Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com> * fix(executor): Reset wait_result on every call to spin_some_impl Before, the method would not recollect available work in case of spin_some, spin_all. This would lead to the method behaving differently than to what the documentation states. Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com> * restore previous test logic for now Signed-off-by: William Woodall <william@osrfoundation.org> * refactor spin_some_impl's logic and improve busy wait tests Signed-off-by: William Woodall <william@osrfoundation.org> * added some more comments about the implementation Signed-off-by: William Woodall <william@osrfoundation.org> --------- Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com> Signed-off-by: jmachowinski <jmachowinski@users.noreply.github.com> Signed-off-by: William Woodall <william@osrfoundation.org> Co-authored-by: Janosch Machowinski <J.Machowinski@cellumation.com> Co-authored-by: jmachowinski <jmachowinski@users.noreply.github.com> Co-authored-by: Tomoya Fujita <Tomoya.Fujita@sony.com>
1 parent 5f912eb commit f7185dc

File tree

2 files changed

+199
-11
lines changed

2 files changed

+199
-11
lines changed

rclcpp/src/rclcpp/executor.cpp

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -366,24 +366,52 @@ Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
366366
}
367367
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
368368

369+
// clear the wait result and wait for work without blocking to collect the work
370+
// for the first time
371+
// both spin_some and spin_all wait for work at the beginning
372+
wait_result_.reset();
373+
wait_for_work(std::chrono::milliseconds(0));
374+
bool just_waited = true;
375+
376+
// The logic of this while loop is as follows:
377+
//
378+
// - while not shutdown, and spinning (not canceled), and not max duration reached...
379+
// - try to get an executable item to execute, and execute it if available
380+
// - otherwise, reset the wait result, and ...
381+
// - if there was no work available just after waiting, break the loop unconditionally
382+
// - this is appropriate for both spin_some and spin_all which use this function
383+
// - else if exhaustive = true, then wait for work again
384+
// - this is only used for spin_all and not spin_some
385+
// - else break
386+
// - this only occurs with spin_some
387+
//
388+
// The logic of this loop is subtle and should be carefully changed if at all.
389+
// See also:
390+
// https://github.com/ros2/rclcpp/issues/2508
391+
// https://github.com/ros2/rclcpp/pull/2517
369392
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
370-
if (!wait_result_.has_value()) {
371-
wait_for_work(std::chrono::milliseconds(0));
372-
}
373-
374393
AnyExecutable any_exec;
375394
if (get_next_ready_executable(any_exec)) {
376395
execute_any_executable(any_exec);
396+
just_waited = false;
377397
} else {
378-
// If nothing is ready, reset the result to signal we are
379-
// ready to wait again
398+
// if nothing is ready, reset the result to clear it
380399
wait_result_.reset();
381-
}
382400

383-
if (!wait_result_.has_value() && !exhaustive) {
384-
// In the case of spin some, then we can exit
385-
// In the case of spin all, then we will allow ourselves to wait again.
386-
break;
401+
if (just_waited) {
402+
// there was no work after just waiting, always exit in this case
403+
// before the exhaustive condition can be checked
404+
break;
405+
}
406+
407+
if (exhaustive) {
408+
// if exhaustive, wait for work again
409+
// this only happens for spin_all; spin_some only waits at the start
410+
wait_for_work(std::chrono::milliseconds(0));
411+
just_waited = true;
412+
} else {
413+
break;
414+
}
387415
}
388416
}
389417
}

rclcpp/test/rclcpp/executors/test_executors.cpp

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ class TestWaitable : public rclcpp::Waitable
357357
bool
358358
is_ready(const rcl_wait_set_t & wait_set) override
359359
{
360+
is_ready_count_++;
360361
for (size_t i = 0; i < wait_set.size_of_guard_conditions; ++i) {
361362
auto rcl_guard_condition = wait_set.guard_conditions[i];
362363
if (&gc_.get_rcl_guard_condition() == rcl_guard_condition) {
@@ -424,8 +425,15 @@ class TestWaitable : public rclcpp::Waitable
424425
return count_;
425426
}
426427

428+
size_t
429+
get_is_ready_call_count() const
430+
{
431+
return is_ready_count_;
432+
}
433+
427434
private:
428435
std::atomic<size_t> trigger_count_ = 0;
436+
std::atomic<size_t> is_ready_count_ = 0;
429437
std::atomic<size_t> count_ = 0;
430438
rclcpp::GuardCondition gc_;
431439
std::function<void()> on_execute_callback_ = nullptr;
@@ -869,3 +877,155 @@ TEST(TestExecutors, testSpinWithNonDefaultContext)
869877

870878
rclcpp::shutdown(non_default_context);
871879
}
880+
881+
template<typename T>
882+
class TestBusyWaiting : public ::testing::Test
883+
{
884+
public:
885+
void SetUp() override
886+
{
887+
rclcpp::init(0, nullptr);
888+
889+
const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
890+
std::stringstream test_name;
891+
test_name << test_info->test_case_name() << "_" << test_info->name();
892+
node = std::make_shared<rclcpp::Node>("node", test_name.str());
893+
callback_group = node->create_callback_group(
894+
rclcpp::CallbackGroupType::MutuallyExclusive,
895+
/* automatically_add_to_executor_with_node =*/ false);
896+
897+
auto waitable_interfaces = node->get_node_waitables_interface();
898+
waitable = std::make_shared<TestWaitable>();
899+
waitable_interfaces->add_waitable(waitable, callback_group);
900+
901+
executor = std::make_shared<T>();
902+
executor->add_callback_group(callback_group, node->get_node_base_interface());
903+
}
904+
905+
void TearDown() override
906+
{
907+
rclcpp::shutdown();
908+
}
909+
910+
void
911+
set_up_and_trigger_waitable(std::function<void()> extra_callback = nullptr)
912+
{
913+
this->has_executed = false;
914+
this->waitable->set_on_execute_callback([this, extra_callback]() {
915+
if (!this->has_executed) {
916+
// trigger once to see if the second trigger is handled or not
917+
// this follow up trigger simulates new entities becoming ready while
918+
// the executor is executing something else, e.g. subscription got data
919+
// or a timer expired, etc.
920+
// spin_some would not handle this second trigger, since it collects
921+
// work only once, whereas spin_all should handle it since it
922+
// collects work multiple times
923+
this->waitable->trigger();
924+
this->has_executed = true;
925+
}
926+
if (nullptr != extra_callback) {
927+
extra_callback();
928+
}
929+
});
930+
this->waitable->trigger();
931+
}
932+
933+
void
934+
check_for_busy_waits(std::chrono::steady_clock::time_point start_time)
935+
{
936+
// rough time based check, since the work to be done was very small it
937+
// should be safe to check that we didn't use more than half the
938+
// max duration, which itself is much larger than necessary
939+
// however, it could still produce a false-positive
940+
EXPECT_LT(
941+
std::chrono::steady_clock::now() - start_time,
942+
max_duration / 2)
943+
<< "executor took a long time to execute when it should have done "
944+
<< "nothing and should not have blocked either, but this could be a "
945+
<< "false negative if the computer is really slow";
946+
947+
// this check is making some assumptions about the implementation of the
948+
// executors, but it should be safe to say that a busy wait may result in
949+
// hundreds or thousands of calls to is_ready(), but "normal" executor
950+
// behavior should be within an order of magnitude of the number of
951+
// times that the waitable was executed
952+
ASSERT_LT(waitable->get_is_ready_call_count(), 10u * this->waitable->get_count());
953+
}
954+
955+
static constexpr auto max_duration = 10s;
956+
957+
rclcpp::Node::SharedPtr node;
958+
rclcpp::CallbackGroup::SharedPtr callback_group;
959+
std::shared_ptr<TestWaitable> waitable;
960+
std::chrono::steady_clock::time_point start_time;
961+
std::shared_ptr<T> executor;
962+
bool has_executed;
963+
};
964+
965+
TYPED_TEST_SUITE(TestBusyWaiting, ExecutorTypes, ExecutorTypeNames);
966+
967+
TYPED_TEST(TestBusyWaiting, test_spin_all)
968+
{
969+
this->set_up_and_trigger_waitable();
970+
971+
auto start_time = std::chrono::steady_clock::now();
972+
this->executor->spin_all(this->max_duration);
973+
this->check_for_busy_waits(start_time);
974+
// this should get the initial trigger, and the follow up from in the callback
975+
ASSERT_EQ(this->waitable->get_count(), 2u);
976+
}
977+
978+
TYPED_TEST(TestBusyWaiting, test_spin_some)
979+
{
980+
this->set_up_and_trigger_waitable();
981+
982+
auto start_time = std::chrono::steady_clock::now();
983+
this->executor->spin_some(this->max_duration);
984+
this->check_for_busy_waits(start_time);
985+
// this should get the inital trigger, but not the follow up in the callback
986+
ASSERT_EQ(this->waitable->get_count(), 1u);
987+
}
988+
989+
TYPED_TEST(TestBusyWaiting, test_spin)
990+
{
991+
std::condition_variable cv;
992+
std::mutex cv_m;
993+
bool first_check_passed = false;
994+
995+
this->set_up_and_trigger_waitable([&cv, &cv_m, &first_check_passed]() {
996+
cv.notify_one();
997+
if (!first_check_passed) {
998+
std::unique_lock<std::mutex> lk(cv_m);
999+
cv.wait_for(lk, 1s, [&]() {return first_check_passed;});
1000+
}
1001+
});
1002+
1003+
auto start_time = std::chrono::steady_clock::now();
1004+
std::thread t([this]() {
1005+
this->executor->spin();
1006+
});
1007+
1008+
// wait until thread has started (first execute of waitable)
1009+
{
1010+
std::unique_lock<std::mutex> lk(cv_m);
1011+
cv.wait_for(lk, 10s);
1012+
}
1013+
EXPECT_GT(this->waitable->get_count(), 0u);
1014+
1015+
first_check_passed = true;
1016+
cv.notify_one();
1017+
1018+
// wait until the executor has finished (second execute of waitable)
1019+
{
1020+
std::unique_lock<std::mutex> lk(cv_m);
1021+
cv.wait_for(lk, 10s);
1022+
}
1023+
EXPECT_EQ(this->waitable->get_count(), 2u);
1024+
1025+
this->executor->cancel();
1026+
t.join();
1027+
1028+
this->check_for_busy_waits(start_time);
1029+
// this should get the initial trigger, and the follow up from in the callback
1030+
ASSERT_EQ(this->waitable->get_count(), 2u);
1031+
}

0 commit comments

Comments
 (0)