Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for intra-process actions #144

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 135 additions & 87 deletions rclcpp/include/rclcpp/experimental/action_client_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,52 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase

// Useful aliases for the action client data types
using ResponseCallback = std::function<void (std::shared_ptr<void>)>;

// Aliases for the GoalResponse ring buffer
using GoalResponse = typename ActionT::Impl::SendGoalService::Response;
using GoalResponseSharedPtr = typename std::shared_ptr<GoalResponse>;
using GoalResponseDataPair = typename std::pair<uint64_t, GoalResponseSharedPtr>;
using GoalResponseVoidDataPair = typename std::pair<uint64_t, std::shared_ptr<void>>;
using GoalResponsePairSharedPtr = typename std::shared_ptr<GoalResponseDataPair>;

// Aliases for the ResultResponse ring buffer
using ResultResponse = typename ActionT::Impl::GetResultService::Response;
using ResultResponseSharedPtr = typename std::shared_ptr<ResultResponse>;
using ResultResponseDataPair = typename std::pair<uint64_t, ResultResponseSharedPtr>;
using ResultResponseVoidDataPair = typename std::pair<uint64_t, std::shared_ptr<void>>;
using ResultResponsePairSharedPtr = typename std::shared_ptr<ResultResponseDataPair>;

// Aliases for the CancelResponse ring buffer
using CancelResponse = typename ActionT::Impl::CancelGoalService::Response;
using CancelResponseSharedPtr = typename std::shared_ptr<CancelResponse>;
using CancelResponseDataPair = typename std::pair<uint64_t, CancelResponseSharedPtr>;
using CancelResponseVoidDataPair = typename std::pair<uint64_t, std::shared_ptr<void>>;
using CancelResponsePairSharedPtr = typename std::shared_ptr<CancelResponseDataPair>;

using FeedbackMessage = typename ActionT::Impl::FeedbackMessage;
using FeedbackSharedPtr = typename std::shared_ptr<FeedbackMessage>;
using CancelGoalSharedPtr = typename std::shared_ptr<void>;
using GoalStatusSharedPtr = typename std::shared_ptr<void>;

ActionClientIntraProcess(
rclcpp::Context::SharedPtr context,
const std::string & action_name,
const rcl_action_client_depth_t & qos_history,
ResponseCallback goal_status_callback,
ResponseCallback feedback_callback)
: goal_status_callback_(goal_status_callback),
feedback_callback_(feedback_callback),
ActionClientIntraProcessBase(
ResponseCallback feedback_callback,
std::recursive_mutex & reentrant_mutex)
: ActionClientIntraProcessBase(
context,
action_name,
QoS(qos_history.goal_service_depth))
QoS(qos_history.goal_service_depth),
reentrant_mutex)
{
// Create the intra-process buffers
goal_response_buffer_ =
rclcpp::experimental::create_service_intra_process_buffer<GoalResponseSharedPtr>(
rclcpp::experimental::create_service_intra_process_buffer<GoalResponsePairSharedPtr>(
QoS(qos_history.goal_service_depth));

result_response_buffer_ =
rclcpp::experimental::create_service_intra_process_buffer<ResultResponseSharedPtr>(
rclcpp::experimental::create_service_intra_process_buffer<ResultResponsePairSharedPtr>(
QoS(qos_history.result_service_depth));

status_buffer_ =
Expand All @@ -90,8 +107,11 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
QoS(qos_history.feedback_topic_depth));

cancel_response_buffer_ =
rclcpp::experimental::create_service_intra_process_buffer<CancelGoalSharedPtr>(
rclcpp::experimental::create_service_intra_process_buffer<CancelResponsePairSharedPtr>(
QoS(qos_history.cancel_service_depth));

set_response_callback_to_event_type(EventType::FeedbackReady, feedback_callback);
set_response_callback_to_event_type(EventType::StatusReady, goal_status_callback);
}

virtual ~ActionClientIntraProcess() = default;
Expand All @@ -100,93 +120,109 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
{
(void) wait_set;

is_goal_response_ready_ = goal_response_buffer_->has_data();
is_result_response_ready_ = result_response_buffer_->has_data();
is_cancel_response_ready_ = cancel_response_buffer_->has_data();
is_feedback_ready_ = feedback_buffer_->has_data();
is_status_ready_ = status_buffer_->has_data();

return is_feedback_ready_ ||
is_status_ready_ ||
is_goal_response_ready_ ||
is_cancel_response_ready_ ||
is_result_response_ready_;
}

// Store responses callbacks.
// We don't use mutex to protect these callbacks since they
// are called always after they are set.
void store_goal_response_callback(ResponseCallback callback)

void store_goal_response_callback(size_t goal_id, ResponseCallback response_callback)
{
goal_response_callback_ = callback;
set_response_callback_to_event_type(EventType::GoalResponse, response_callback, goal_id);
}

void store_cancel_goal_callback(ResponseCallback callback)
void store_cancel_goal_callback(size_t goal_id, ResponseCallback callback)
{
cancel_goal_callback_ = callback;
set_response_callback_to_event_type(EventType::CancelResponse, callback, goal_id);
}

void store_result_response_callback(ResponseCallback callback)
void store_result_response_callback(size_t goal_id, ResponseCallback callback)
{
result_response_callback_ = callback;
set_response_callback_to_event_type(EventType::ResultResponse, callback, goal_id);
}

// Store responses from server
void store_ipc_action_goal_response(GoalResponseSharedPtr goal_response)
void store_ipc_action_goal_response(
GoalResponseSharedPtr goal_response,
size_t goal_id)
{
goal_response_buffer_->add(std::move(goal_response));
goal_response_buffer_->add(
std::make_shared<GoalResponseDataPair>(
std::make_pair(goal_id, std::move(goal_response))));

gc_.trigger();
is_goal_response_ready_ = true;
invoke_on_ready_callback(EventType::GoalResponse);
invoke_on_ready_callback(EventType::GoalResponse, goal_id);
}

void store_ipc_action_result_response(ResultResponseSharedPtr result_response)
void store_ipc_action_result_response(
ResultResponseSharedPtr result_response,
size_t goal_id)
{
result_response_buffer_->add(std::move(result_response));
result_response_buffer_->add(
std::make_shared<ResultResponseDataPair>(
std::make_pair(goal_id, std::move(result_response))));

gc_.trigger();
is_result_response_ready_ = true;
invoke_on_ready_callback(EventType::ResultResponse);
invoke_on_ready_callback(EventType::ResultResponse, goal_id);
}

void store_ipc_action_cancel_response(CancelGoalSharedPtr cancel_response)
void store_ipc_action_cancel_response(
CancelResponseSharedPtr cancel_response,
size_t goal_id)
{
cancel_response_buffer_->add(std::move(cancel_response));
cancel_response_buffer_->add(
std::make_shared<CancelResponseDataPair>(
std::make_pair(goal_id, std::move(cancel_response))));

gc_.trigger();
is_cancel_response_ready_ = true;
invoke_on_ready_callback(EventType::CancelResponse);
invoke_on_ready_callback(EventType::CancelResponse, goal_id);
}

void store_ipc_action_feedback(FeedbackSharedPtr feedback)
{
feedback_buffer_->add(std::move(feedback));
gc_.trigger();
is_feedback_ready_ = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why these are removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the flag here was breaking the SingleThreadedExecutor, which already sets the is_*_ready_ flags on the is_ready() API.
For EventsExecutor, the flags are set on the take_data_by_entity_id() API.

invoke_on_ready_callback(EventType::FeedbackReady);
}

void store_ipc_action_goal_status(GoalStatusSharedPtr status)
{
status_buffer_->add(std::move(status));
gc_.trigger();
is_status_ready_ = true;
invoke_on_ready_callback(EventType::StatusReady);
}

std::shared_ptr<void>
take_data() override
{
std::shared_ptr<void> data;

if (is_goal_response_ready_) {
auto data = std::move(goal_response_buffer_->consume());
return std::static_pointer_cast<void>(data);
} else if (is_result_response_ready_) {
auto data = std::move(result_response_buffer_->consume());
return std::static_pointer_cast<void>(data);
} else if (is_cancel_response_ready_) {
auto data = std::move(cancel_response_buffer_->consume());
return std::static_pointer_cast<void>(data);
} else if (is_feedback_ready_) {
auto data = std::move(feedback_buffer_->consume());
return std::static_pointer_cast<void>(data);
} else if (is_status_ready_) {
auto data = std::move(status_buffer_->consume());
return std::static_pointer_cast<void>(data);
} else {
throw std::runtime_error("Taking data from intra-process action client but nothing is ready");
data = std::move(goal_response_buffer_->consume());
}
else if (is_result_response_ready_) {
data = std::move(result_response_buffer_->consume());
}
else if (is_cancel_response_ready_) {
data = std::move(cancel_response_buffer_->consume());
}
else if (is_feedback_ready_) {
data = std::move(feedback_buffer_->consume());
}
else if (is_status_ready_) {
data = status_buffer_->consume();
}

// Data could be null if there were more events than elements in the buffer
return data;
}

std::shared_ptr<void>
Expand All @@ -195,79 +231,91 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
// Mark as ready the event type from which we want to take data
switch (static_cast<EventType>(id)) {
case EventType::ResultResponse:
is_result_response_ready_ = true;
is_result_response_ready_ = result_response_buffer_->has_data();
break;
case EventType::CancelResponse:
is_cancel_response_ready_ = true;
is_cancel_response_ready_ = cancel_response_buffer_->has_data();
break;
case EventType::GoalResponse:
is_goal_response_ready_ = true;
is_goal_response_ready_ = goal_response_buffer_->has_data();
break;
case EventType::FeedbackReady:
is_feedback_ready_ = true;
is_feedback_ready_ = feedback_buffer_->has_data();
break;
case EventType::StatusReady:
is_status_ready_ = true;
is_status_ready_ = status_buffer_->has_data();
break;
}

return take_data();
}


void execute(std::shared_ptr<void> & data)
{
// How to handle case when more than one flag is ready?
// For example, feedback and status are both ready, guard condition triggered
// twice, but we process a single entity here.
// On the default executor using a waitset, waitables are checked twice if ready,
// so that fixes the issue. Check if this is a problem with EventsExecutor.
if (!data) {
throw std::runtime_error("'data' is empty");
// This can happen when there were more events than elements in the ring buffer
return;
}

if (is_goal_response_ready_) {
is_goal_response_ready_ = false;
goal_response_callback_(std::move(data));
} else if (is_result_response_ready_) {
is_result_response_ready_ = false;
result_response_callback_(std::move(data));
} else if (is_cancel_response_ready_) {
is_cancel_response_ready_ = false;
cancel_goal_callback_(std::move(data));
} else if (is_feedback_ready_) {
is_feedback_ready_ = false;
feedback_callback_(std::move(data));
} else if (is_status_ready_) {
is_status_ready_ = false;
goal_status_callback_(std::move(data));
} else {
if (is_goal_response_ready_.exchange(false)) {
auto goal_response_pair = std::static_pointer_cast<GoalResponseVoidDataPair>(data);
auto goal_id = goal_response_pair->first;
auto & goal_response = goal_response_pair->second;

call_response_callback_and_erase(
EventType::GoalResponse,
goal_response,
goal_id);
}
else if (is_result_response_ready_.exchange(false)) {
auto result_response_pair = std::static_pointer_cast<ResultResponseVoidDataPair>(data);
auto goal_id = result_response_pair->first;
auto & result_response = result_response_pair->second;

call_response_callback_and_erase(
EventType::ResultResponse,
result_response,
goal_id);
}
else if (is_cancel_response_ready_.exchange(false)) {
auto cancel_response_pair = std::static_pointer_cast<CancelResponseVoidDataPair>(data);
auto goal_id = cancel_response_pair->first;
auto & cancel_response = cancel_response_pair->second;

call_response_callback_and_erase(
EventType::CancelResponse,
cancel_response,
goal_id);
}
else if (is_feedback_ready_.exchange(false)) {
call_response_callback_and_erase(
EventType::FeedbackReady, data, 0, false);
}
else if (is_status_ready_.exchange(false)) {
call_response_callback_and_erase(
EventType::StatusReady, data, 0, false);
}
else {
throw std::runtime_error("Executing intra-process action client but nothing is ready");
}
}

protected:
ResponseCallback goal_response_callback_;
ResponseCallback result_response_callback_;
ResponseCallback cancel_goal_callback_;
ResponseCallback goal_status_callback_;
ResponseCallback feedback_callback_;

// Create buffers to store data coming from server
// Declare buffers to store responses coming from action server
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
GoalResponseSharedPtr>::UniquePtr goal_response_buffer_;
GoalResponsePairSharedPtr>::UniquePtr goal_response_buffer_;

typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
ResultResponseSharedPtr>::UniquePtr result_response_buffer_;
ResultResponsePairSharedPtr>::UniquePtr result_response_buffer_;

typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
FeedbackSharedPtr>::UniquePtr feedback_buffer_;

rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
GoalStatusSharedPtr>::UniquePtr status_buffer_;

rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
CancelGoalSharedPtr>::UniquePtr cancel_response_buffer_;
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
CancelResponsePairSharedPtr>::UniquePtr cancel_response_buffer_;

std::atomic<bool> is_feedback_ready_{false};
std::atomic<bool> is_status_ready_{false};
Expand Down
Loading