Skip to content

Commit

Permalink
Merge pull request ros2#70 from irobot-ros/asoragna/simplify-queue
Browse files Browse the repository at this point in the history
rename lock free queue and remove wait_for_event API
  • Loading branch information
iRobot ROS authored Jun 10, 2021
2 parents 69e0997 + d5f5501 commit 0377946
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 94 deletions.
21 changes: 6 additions & 15 deletions rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ class EventsQueue
enqueue(const rclcpp::executors::ExecutorEvent & event) = 0;

/**
* @brief gets the front event from the queue
* @return the front event
* @brief gets the front event from the queue, eventually waiting for it
* @return true if event, false if timeout
*/
RCLCPP_PUBLIC
virtual
rclcpp::executors::ExecutorEvent
dequeue() = 0;
bool
dequeue(
rclcpp::executors::ExecutorEvent & event,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds::max()) = 0;

/**
* @brief Test whether queue is empty
Expand All @@ -85,17 +87,6 @@ class EventsQueue
virtual
size_t
size() const = 0;

/**
* @brief waits for an event until timeout
* @return true if event, false if timeout
*/
RCLCPP_PUBLIC
virtual
bool
wait_for_event(
rclcpp::executors::ExecutorEvent & event,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds::max()) = 0;
};

} // namespace buffers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__BLOCKING_CONCURRENT_QUEUE_HPP_
#define RCLCPP__EXPERIMENTAL__BUFFERS__BLOCKING_CONCURRENT_QUEUE_HPP_
#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__LOCK_FREE_EVENTS_QUEUE_HPP_
#define RCLCPP__EXPERIMENTAL__BUFFERS__LOCK_FREE_EVENTS_QUEUE_HPP_

#include <queue>

Expand All @@ -38,11 +38,11 @@ namespace buffers
* queue aims to fix the issue of publishers being blocked by the executor extracting
* events from the queue in a different thread, causing expensive mutex contention.
*/
class BlockingConcurrentQueue : public EventsQueue
class LockFreeEventsQueue : public EventsQueue
{
public:
RCLCPP_PUBLIC
~BlockingConcurrentQueue() override
~LockFreeEventsQueue() override
{
// It's important that all threads have finished using the queue
// and the memory effects have fully propagated, before it is destructed.
Expand All @@ -61,27 +61,11 @@ class BlockingConcurrentQueue : public EventsQueue
{
rclcpp::executors::ExecutorEvent single_event = event;
single_event.num_events = 1;
for (size_t ev = 1; ev <= event.num_events; ev++ ) {
for (size_t ev = 0; ev < event.num_events; ev++) {
event_queue_.enqueue(single_event);
}
}

/**
* @brief dequeue the front event from the queue.
* The event is removed from the queue after this operation.
* Callers should make sure the queue is not empty before calling.
*
* @return the front event
*/
RCLCPP_PUBLIC
rclcpp::executors::ExecutorEvent
dequeue() override
{
rclcpp::executors::ExecutorEvent event;
event_queue_.try_dequeue(event);
return event;
}

/**
* @brief Test whether queue is empty
* @return true if the queue's size is 0, false otherwise.
Expand All @@ -107,22 +91,16 @@ class BlockingConcurrentQueue : public EventsQueue
}

/**
* @brief waits for an event until timeout
* @brief gets the front event from the queue, eventually waiting for it
* @return true if event, false if timeout
*/
RCLCPP_PUBLIC
bool
wait_for_event(
dequeue(
rclcpp::executors::ExecutorEvent & event,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds::max()) override
{
if (timeout != std::chrono::nanoseconds::max()) {
return event_queue_.wait_dequeue_timed(event, timeout);
}

// If no timeout specified, just wait for an event to arrive
event_queue_.wait_dequeue(event);
return true;
return event_queue_.wait_dequeue_timed(event, timeout);
}

private:
Expand All @@ -134,4 +112,4 @@ class BlockingConcurrentQueue : public EventsQueue
} // namespace rclcpp


#endif // RCLCPP__EXPERIMENTAL__BUFFERS__BLOCKING_CONCURRENT_QUEUE_HPP_
#endif // RCLCPP__EXPERIMENTAL__BUFFERS__LOCK_FREE_EVENTS_QUEUE_HPP_
43 changes: 6 additions & 37 deletions rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,6 @@ class SimpleEventsQueue : public EventsQueue
events_queue_cv_.notify_one();
}

/**
* @brief dequeue the front event from the queue.
* The event is removed from the queue after this operation.
* Callers should make sure the queue is not empty before calling.
* Thread safe
*
* @return the front event
*/
RCLCPP_PUBLIC
rclcpp::executors::ExecutorEvent
dequeue() override
{
std::unique_lock<std::mutex> lock(this->push_mutex_);
rclcpp::executors::ExecutorEvent event = event_queue_.front();
event_queue_.pop();
return event;
}

/**
* @brief Test whether queue is empty
* Thread safe
Expand Down Expand Up @@ -105,36 +87,23 @@ class SimpleEventsQueue : public EventsQueue
}

/**
* @brief waits for an event until timeout, gets a single event
* Thread safe
* @brief gets the front event from the queue, eventually waiting for it
* @return true if event, false if timeout
*/
RCLCPP_PUBLIC
bool
wait_for_event(
dequeue(
rclcpp::executors::ExecutorEvent & event,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds::max()) override
{
auto has_event_predicate = [this]() {return !event_queue_.empty();};

std::unique_lock<std::mutex> lock(this->push_mutex_);

if (timeout != std::chrono::nanoseconds::max()) {
// We wait here until timeout or until something is pushed into the queue
events_queue_cv_.wait_for(lock, timeout, has_event_predicate);
if (event_queue_.empty()) {
return false;
} else {
event = event_queue_.front();
event_queue_.pop();
return true;
}
} else {
// We wait here until something has been pushed into the queue
events_queue_cv_.wait(lock, has_event_predicate);
bool notified = events_queue_cv_.wait_for(lock, timeout, [this]() {return !event_queue_.empty();});
if (notified) {
event = event_queue_.front();
event_queue_.pop();
return true;
} else {
return false;
}
}

Expand Down
20 changes: 9 additions & 11 deletions rclcpp/src/rclcpp/executors/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,8 @@ EventsExecutor::spin()
while (rclcpp::ok(context_) && spinning.load()) {
// Wait until we get an event
ExecutorEvent event;
events_queue_->wait_for_event(event);
this->execute_event(event);

// Process rest of events, if any
while (!events_queue_->empty())
{
event = events_queue_->dequeue();
bool has_event = events_queue_->dequeue(event);
if (has_event) {
this->execute_event(event);
}
}
Expand Down Expand Up @@ -130,9 +125,12 @@ EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhau
bool has_event = !events_queue_->empty();

if (has_event) {
ExecutorEvent event = events_queue_->dequeue();
this->execute_event(event);
executed_events++;
ExecutorEvent event;
bool ret = events_queue_->dequeue(event);
if (ret) {
this->execute_event(event);
executed_events++;
}
continue;
}
}
Expand Down Expand Up @@ -171,7 +169,7 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
}

ExecutorEvent event;
bool has_event = events_queue_->wait_for_event(event, timeout);
bool has_event = events_queue_->dequeue(event, timeout);

// If we wake up from the wait with an event, it means that it
// arrived before any of the timers expired.
Expand Down

0 comments on commit 0377946

Please sign in to comment.