Skip to content

Commit

Permalink
Fixes for intra-process actions (irobot-ros#144)
Browse files Browse the repository at this point in the history
* Fixes for intra-process Actions

* Fixes for Clang builds

* Fix deadlock

* Server to store results until client requests them

* Fix feedback/result data race

See ros2#2451

* Add missing mutex

* Check return value of intra_process_action_send

---------

Co-authored-by: Mauro Passerino <mpasserino@irobot.com>
  • Loading branch information
2 people authored and Alexis Pojomovsky committed Jun 21, 2024
1 parent b865383 commit 69e426a
Show file tree
Hide file tree
Showing 15 changed files with 683 additions and 648 deletions.
222 changes: 135 additions & 87 deletions rclcpp/include/rclcpp/experimental/action_client_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,52 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase

// Useful aliases for the action client data types
using ResponseCallback = std::function<void (std::shared_ptr<void>)>;

// Aliases for the GoalResponse ring buffer
using GoalResponse = typename ActionT::Impl::SendGoalService::Response;
using GoalResponseSharedPtr = typename std::shared_ptr<GoalResponse>;
using GoalResponseDataPair = typename std::pair<uint64_t, GoalResponseSharedPtr>;
using GoalResponseVoidDataPair = typename std::pair<uint64_t, std::shared_ptr<void>>;
using GoalResponsePairSharedPtr = typename std::shared_ptr<GoalResponseDataPair>;

// Aliases for the ResultResponse ring buffer
using ResultResponse = typename ActionT::Impl::GetResultService::Response;
using ResultResponseSharedPtr = typename std::shared_ptr<ResultResponse>;
using ResultResponseDataPair = typename std::pair<uint64_t, ResultResponseSharedPtr>;
using ResultResponseVoidDataPair = typename std::pair<uint64_t, std::shared_ptr<void>>;
using ResultResponsePairSharedPtr = typename std::shared_ptr<ResultResponseDataPair>;

// Aliases for the CancelResponse ring buffer
using CancelResponse = typename ActionT::Impl::CancelGoalService::Response;
using CancelResponseSharedPtr = typename std::shared_ptr<CancelResponse>;
using CancelResponseDataPair = typename std::pair<uint64_t, CancelResponseSharedPtr>;
using CancelResponseVoidDataPair = typename std::pair<uint64_t, std::shared_ptr<void>>;
using CancelResponsePairSharedPtr = typename std::shared_ptr<CancelResponseDataPair>;

using FeedbackMessage = typename ActionT::Impl::FeedbackMessage;
using FeedbackSharedPtr = typename std::shared_ptr<FeedbackMessage>;
using CancelGoalSharedPtr = typename std::shared_ptr<void>;
using GoalStatusSharedPtr = typename std::shared_ptr<void>;

ActionClientIntraProcess(
rclcpp::Context::SharedPtr context,
const std::string & action_name,
const rcl_action_client_depth_t & qos_history,
ResponseCallback goal_status_callback,
ResponseCallback feedback_callback)
: goal_status_callback_(goal_status_callback),
feedback_callback_(feedback_callback),
ActionClientIntraProcessBase(
ResponseCallback feedback_callback,
std::recursive_mutex & reentrant_mutex)
: ActionClientIntraProcessBase(
context,
action_name,
QoS(qos_history.goal_service_depth))
QoS(qos_history.goal_service_depth),
reentrant_mutex)
{
// Create the intra-process buffers
goal_response_buffer_ =
rclcpp::experimental::create_service_intra_process_buffer<GoalResponseSharedPtr>(
rclcpp::experimental::create_service_intra_process_buffer<GoalResponsePairSharedPtr>(
QoS(qos_history.goal_service_depth));

result_response_buffer_ =
rclcpp::experimental::create_service_intra_process_buffer<ResultResponseSharedPtr>(
rclcpp::experimental::create_service_intra_process_buffer<ResultResponsePairSharedPtr>(
QoS(qos_history.result_service_depth));

status_buffer_ =
Expand All @@ -90,8 +107,11 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
QoS(qos_history.feedback_topic_depth));

cancel_response_buffer_ =
rclcpp::experimental::create_service_intra_process_buffer<CancelGoalSharedPtr>(
rclcpp::experimental::create_service_intra_process_buffer<CancelResponsePairSharedPtr>(
QoS(qos_history.cancel_service_depth));

set_response_callback_to_event_type(EventType::FeedbackReady, feedback_callback);
set_response_callback_to_event_type(EventType::StatusReady, goal_status_callback);
}

virtual ~ActionClientIntraProcess() = default;
Expand All @@ -100,93 +120,109 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
{
(void) wait_set;

is_goal_response_ready_ = goal_response_buffer_->has_data();
is_result_response_ready_ = result_response_buffer_->has_data();
is_cancel_response_ready_ = cancel_response_buffer_->has_data();
is_feedback_ready_ = feedback_buffer_->has_data();
is_status_ready_ = status_buffer_->has_data();

return is_feedback_ready_ ||
is_status_ready_ ||
is_goal_response_ready_ ||
is_cancel_response_ready_ ||
is_result_response_ready_;
}

// Store responses callbacks.
// We don't use mutex to protect these callbacks since they
// are called always after they are set.
void store_goal_response_callback(ResponseCallback callback)

void store_goal_response_callback(size_t goal_id, ResponseCallback response_callback)
{
goal_response_callback_ = callback;
set_response_callback_to_event_type(EventType::GoalResponse, response_callback, goal_id);
}

void store_cancel_goal_callback(ResponseCallback callback)
void store_cancel_goal_callback(size_t goal_id, ResponseCallback callback)
{
cancel_goal_callback_ = callback;
set_response_callback_to_event_type(EventType::CancelResponse, callback, goal_id);
}

void store_result_response_callback(ResponseCallback callback)
void store_result_response_callback(size_t goal_id, ResponseCallback callback)
{
result_response_callback_ = callback;
set_response_callback_to_event_type(EventType::ResultResponse, callback, goal_id);
}

// Store responses from server
void store_ipc_action_goal_response(GoalResponseSharedPtr goal_response)
void store_ipc_action_goal_response(
GoalResponseSharedPtr goal_response,
size_t goal_id)
{
goal_response_buffer_->add(std::move(goal_response));
goal_response_buffer_->add(
std::make_shared<GoalResponseDataPair>(
std::make_pair(goal_id, std::move(goal_response))));

gc_.trigger();
is_goal_response_ready_ = true;
invoke_on_ready_callback(EventType::GoalResponse);
invoke_on_ready_callback(EventType::GoalResponse, goal_id);
}

void store_ipc_action_result_response(ResultResponseSharedPtr result_response)
void store_ipc_action_result_response(
ResultResponseSharedPtr result_response,
size_t goal_id)
{
result_response_buffer_->add(std::move(result_response));
result_response_buffer_->add(
std::make_shared<ResultResponseDataPair>(
std::make_pair(goal_id, std::move(result_response))));

gc_.trigger();
is_result_response_ready_ = true;
invoke_on_ready_callback(EventType::ResultResponse);
invoke_on_ready_callback(EventType::ResultResponse, goal_id);
}

void store_ipc_action_cancel_response(CancelGoalSharedPtr cancel_response)
void store_ipc_action_cancel_response(
CancelResponseSharedPtr cancel_response,
size_t goal_id)
{
cancel_response_buffer_->add(std::move(cancel_response));
cancel_response_buffer_->add(
std::make_shared<CancelResponseDataPair>(
std::make_pair(goal_id, std::move(cancel_response))));

gc_.trigger();
is_cancel_response_ready_ = true;
invoke_on_ready_callback(EventType::CancelResponse);
invoke_on_ready_callback(EventType::CancelResponse, goal_id);
}

void store_ipc_action_feedback(FeedbackSharedPtr feedback)
{
feedback_buffer_->add(std::move(feedback));
gc_.trigger();
is_feedback_ready_ = true;
invoke_on_ready_callback(EventType::FeedbackReady);
}

void store_ipc_action_goal_status(GoalStatusSharedPtr status)
{
status_buffer_->add(std::move(status));
gc_.trigger();
is_status_ready_ = true;
invoke_on_ready_callback(EventType::StatusReady);
}

std::shared_ptr<void>
take_data() override
{
std::shared_ptr<void> data;

if (is_goal_response_ready_) {
auto data = std::move(goal_response_buffer_->consume());
return std::static_pointer_cast<void>(data);
} else if (is_result_response_ready_) {
auto data = std::move(result_response_buffer_->consume());
return std::static_pointer_cast<void>(data);
} else if (is_cancel_response_ready_) {
auto data = std::move(cancel_response_buffer_->consume());
return std::static_pointer_cast<void>(data);
} else if (is_feedback_ready_) {
auto data = std::move(feedback_buffer_->consume());
return std::static_pointer_cast<void>(data);
} else if (is_status_ready_) {
auto data = std::move(status_buffer_->consume());
return std::static_pointer_cast<void>(data);
} else {
throw std::runtime_error("Taking data from intra-process action client but nothing is ready");
data = std::move(goal_response_buffer_->consume());
}
else if (is_result_response_ready_) {
data = std::move(result_response_buffer_->consume());
}
else if (is_cancel_response_ready_) {
data = std::move(cancel_response_buffer_->consume());
}
else if (is_feedback_ready_) {
data = std::move(feedback_buffer_->consume());
}
else if (is_status_ready_) {
data = status_buffer_->consume();
}

// Data could be null if there were more events than elements in the buffer
return data;
}

std::shared_ptr<void>
Expand All @@ -195,79 +231,91 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
// Mark as ready the event type from which we want to take data
switch (static_cast<EventType>(id)) {
case EventType::ResultResponse:
is_result_response_ready_ = true;
is_result_response_ready_ = result_response_buffer_->has_data();
break;
case EventType::CancelResponse:
is_cancel_response_ready_ = true;
is_cancel_response_ready_ = cancel_response_buffer_->has_data();
break;
case EventType::GoalResponse:
is_goal_response_ready_ = true;
is_goal_response_ready_ = goal_response_buffer_->has_data();
break;
case EventType::FeedbackReady:
is_feedback_ready_ = true;
is_feedback_ready_ = feedback_buffer_->has_data();
break;
case EventType::StatusReady:
is_status_ready_ = true;
is_status_ready_ = status_buffer_->has_data();
break;
}

return take_data();
}


void execute(std::shared_ptr<void> & data)
{
// How to handle case when more than one flag is ready?
// For example, feedback and status are both ready, guard condition triggered
// twice, but we process a single entity here.
// On the default executor using a waitset, waitables are checked twice if ready,
// so that fixes the issue. Check if this is a problem with EventsExecutor.
if (!data) {
throw std::runtime_error("'data' is empty");
// This can happen when there were more events than elements in the ring buffer
return;
}

if (is_goal_response_ready_) {
is_goal_response_ready_ = false;
goal_response_callback_(std::move(data));
} else if (is_result_response_ready_) {
is_result_response_ready_ = false;
result_response_callback_(std::move(data));
} else if (is_cancel_response_ready_) {
is_cancel_response_ready_ = false;
cancel_goal_callback_(std::move(data));
} else if (is_feedback_ready_) {
is_feedback_ready_ = false;
feedback_callback_(std::move(data));
} else if (is_status_ready_) {
is_status_ready_ = false;
goal_status_callback_(std::move(data));
} else {
if (is_goal_response_ready_.exchange(false)) {
auto goal_response_pair = std::static_pointer_cast<GoalResponseVoidDataPair>(data);
auto goal_id = goal_response_pair->first;
auto & goal_response = goal_response_pair->second;

call_response_callback_and_erase(
EventType::GoalResponse,
goal_response,
goal_id);
}
else if (is_result_response_ready_.exchange(false)) {
auto result_response_pair = std::static_pointer_cast<ResultResponseVoidDataPair>(data);
auto goal_id = result_response_pair->first;
auto & result_response = result_response_pair->second;

call_response_callback_and_erase(
EventType::ResultResponse,
result_response,
goal_id);
}
else if (is_cancel_response_ready_.exchange(false)) {
auto cancel_response_pair = std::static_pointer_cast<CancelResponseVoidDataPair>(data);
auto goal_id = cancel_response_pair->first;
auto & cancel_response = cancel_response_pair->second;

call_response_callback_and_erase(
EventType::CancelResponse,
cancel_response,
goal_id);
}
else if (is_feedback_ready_.exchange(false)) {
call_response_callback_and_erase(
EventType::FeedbackReady, data, 0, false);
}
else if (is_status_ready_.exchange(false)) {
call_response_callback_and_erase(
EventType::StatusReady, data, 0, false);
}
else {
throw std::runtime_error("Executing intra-process action client but nothing is ready");
}
}

protected:
ResponseCallback goal_response_callback_;
ResponseCallback result_response_callback_;
ResponseCallback cancel_goal_callback_;
ResponseCallback goal_status_callback_;
ResponseCallback feedback_callback_;

// Create buffers to store data coming from server
// Declare buffers to store responses coming from action server
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
GoalResponseSharedPtr>::UniquePtr goal_response_buffer_;
GoalResponsePairSharedPtr>::UniquePtr goal_response_buffer_;

typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
ResultResponseSharedPtr>::UniquePtr result_response_buffer_;
ResultResponsePairSharedPtr>::UniquePtr result_response_buffer_;

typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
FeedbackSharedPtr>::UniquePtr feedback_buffer_;

rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
GoalStatusSharedPtr>::UniquePtr status_buffer_;

rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
CancelGoalSharedPtr>::UniquePtr cancel_response_buffer_;
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
CancelResponsePairSharedPtr>::UniquePtr cancel_response_buffer_;

std::atomic<bool> is_feedback_ready_{false};
std::atomic<bool> is_status_ready_{false};
Expand Down
Loading

0 comments on commit 69e426a

Please sign in to comment.