Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for intra-process actions #144

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 183 additions & 84 deletions rclcpp/include/rclcpp/experimental/action_client_intra_process.hpp

Large diffs are not rendered by default.

288 changes: 205 additions & 83 deletions rclcpp/include/rclcpp/experimental/action_client_intra_process_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,123 +128,245 @@ class ActionClientIntraProcessBase : public rclcpp::Waitable
"is not callable.");
}

set_callback_to_event_type(EventType::ResultResponse, callback);
set_callback_to_event_type(EventType::CancelResponse, callback);
set_callback_to_event_type(EventType::GoalResponse, callback);
set_callback_to_event_type(EventType::FeedbackReady, callback);
set_callback_to_event_type(EventType::StatusReady, callback);
reentrant_mutex_.lock();
on_ready_callback_ = callback;

// If we had events happened before the "on_ready" callback was set,
// call callback with the events counter "unread_count".
for (auto& pair : event_info_multi_map_) {
auto & unread_count = pair.second.unread_count;
auto & event_type = pair.second.event_type;
if (unread_count) {
callback(unread_count, static_cast<int>(event_type));
unread_count = 0;
}
}
reentrant_mutex_.unlock();

result_reponse_mutex_.lock();
result_response_on_ready_callback_ = callback;

for (auto& pair : result_response_map_) {
auto & unread_count = pair.second.unread_count;
auto & response_callback = pair.second.response_callback;
if (unread_count && response_callback) {
callback(unread_count, static_cast<int>(EventType::ResultResponse));
unread_count = 0;
}
}
result_reponse_mutex_.unlock();
}

void
clear_on_ready_callback() override
{
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
event_type_to_on_ready_callback_.clear();
reentrant_mutex_.unlock();
on_ready_callback_ = nullptr;
reentrant_mutex_.unlock();

result_reponse_mutex_.lock();
result_response_on_ready_callback_ = nullptr;
result_reponse_mutex_.unlock();
}

void erase_goal_info(size_t goal_id)
{
reentrant_mutex_.unlock();
event_info_multi_map_.erase(goal_id);
reentrant_mutex_.unlock();

result_reponse_mutex_.lock();
result_response_map_.erase(goal_id);
result_reponse_mutex_.unlock();
}

protected:
std::recursive_mutex reentrant_mutex_;
rclcpp::GuardCondition gc_;

// Action client on ready callbacks and unread count.
// These callbacks can be set by the user to be notified about new events
// on the action client like a new goal response, result response, etc.
// These events have a counter associated with them, counting the amount of events
// that happened before having assigned a callback for them.
using EventTypeOnReadyCallback = std::function<void (size_t)>;
using CallbackUnreadCountPair = std::pair<EventTypeOnReadyCallback, size_t>;
// Alias for the type used for the server responses callback
using ResponseCallback = std::function<void (std::shared_ptr<void> /*server response*/)>;
using OnReadyCallback = std::function<void(size_t, int)>;

// Define a structure to hold event information
struct EventInfo {
// The event type
EventType event_type;
// The callback to be called with the responses from the server
ResponseCallback response_callback;
// Counter of events received before the "on_ready" and "response_callback" were set
size_t unread_count;
};

// Mutex to protect ResultResponse callback and events info
std::recursive_mutex result_reponse_mutex_;
OnReadyCallback result_response_on_ready_callback_{nullptr};
std::unordered_map<size_t /*Goal ID*/, EventInfo> result_response_map_;

// Map the different action client event types to their callbacks and unread count.
std::unordered_map<EventType, CallbackUnreadCountPair> event_type_to_on_ready_callback_;
// Mutex to protect the rest of the event types
// We use two mutexes, since having just one was resulting in deadlock
std::recursive_mutex reentrant_mutex_;
OnReadyCallback on_ready_callback_{nullptr};
std::unordered_multimap<size_t /*Goal ID*/, EventInfo> event_info_multi_map_;

// Invoke the callback to be called when the action client has a new event
void
invoke_on_ready_callback(EventType event_type)
void invoke_on_ready_callback(
EventType event_type,
size_t goal_id = 0)
{
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
if (event_type == EventType::ResultResponse) {
std::lock_guard<std::recursive_mutex> lock(result_reponse_mutex_);

auto it = result_response_map_.find(goal_id);

if (it != result_response_map_.end()) {
auto & response_callback = it->second.response_callback;
if (response_callback && result_response_on_ready_callback_) {
result_response_on_ready_callback_(1,
static_cast<int>(EventType::ResultResponse));
} else {
it->second.unread_count++;
}
return;
}

// If no entry found, create a new one with unread_count = 1
EventInfo event_info{EventType::ResultResponse, nullptr, 1};
result_response_map_.emplace(goal_id, event_info);
return;
}

// Search for a callback for this event type
auto it = event_type_to_on_ready_callback_.find(event_type);
// For the rest of the event types
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);

if (it != event_type_to_on_ready_callback_.end()) {
auto & on_ready_callback = it->second.first;
// If there's a callback associated with this event type, call it
if (on_ready_callback) {
on_ready_callback(1);
} else {
// We don't have a callback for this event type yet,
// increase its event counter.
auto & event_type_unread_count = it->second.second;
event_type_unread_count++;
auto range = event_info_multi_map_.equal_range(goal_id);
for (auto it = range.first; it != range.second; ++it) {
if (it->second.event_type == event_type) {
if (on_ready_callback_) {
on_ready_callback_(1, static_cast<int>(event_type));
} else {
it->second.unread_count++;
}
return;
}
} else {
// No entries found for this event type, create one
// with an emtpy callback and one unread event.
event_type_to_on_ready_callback_.emplace(event_type, std::make_pair(nullptr, 1));
}
}

private:
std::string action_name_;
QoS qos_profile_;
// If no entry found, create a new one with unread_count = 1
EventInfo event_info{event_type, nullptr, 1};
event_info_multi_map_.emplace(goal_id, event_info);
}

void set_callback_to_event_type(
// Function to set the "reseponse_callback" to the event type
void set_response_callback_to_event_type(
EventType event_type,
std::function<void(size_t, int)> callback)
ResponseCallback response_callback,
size_t goal_id = 0)
{
auto new_callback = create_event_type_callback(callback, event_type);
if (event_type == EventType::ResultResponse) {
std::lock_guard<std::recursive_mutex> lock(result_reponse_mutex_);

auto it = result_response_map_.find(goal_id);
if (it != result_response_map_.end()) {
// Set response callback
it->second.response_callback = response_callback;
// Check if we already have the response, if so call "on_ready" callback
auto & unread_count = it->second.unread_count;
if (unread_count && result_response_on_ready_callback_) {
result_response_on_ready_callback_(
unread_count, static_cast<int>(EventType::ResultResponse));
unread_count = 0;
}
return;
}

EventInfo event_info{EventType::ResultResponse, response_callback, 0};
result_response_map_.emplace(goal_id, event_info);
return;
}

// For the rest of the event types
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);

// Check if we have already an entry for this event type
auto it = event_type_to_on_ready_callback_.find(event_type);
// Get the range of EventInfo matching the goal_id
auto range = event_info_multi_map_.equal_range(goal_id);

if (it != event_type_to_on_ready_callback_.end()) {
// We have an entry for this event type, check how many
// events of this event type happened so far.
auto & event_type_unread_count = it->second.second;
if (event_type_unread_count) {
new_callback(event_type_unread_count);
for (auto it = range.first; it != range.second; ++it) {
if (it->second.event_type == event_type) {
it->second.response_callback = response_callback;
return;
}
event_type_unread_count = 0;
// Set the new callback for this event type
auto & event_type_on_ready_callback = it->second.first;
event_type_on_ready_callback = new_callback;
} else {
// We had no entries for this event type, create one
// with the new callback and zero as unread count.
event_type_to_on_ready_callback_.emplace(event_type, std::make_pair(new_callback, 0));
}

// If no entry found, create a new one.
EventInfo event_info{event_type, response_callback, 0};
event_info_multi_map_.emplace(goal_id, event_info);
}

std::function<void(size_t)>
create_event_type_callback(
std::function<void(size_t, int)> callback,
EventType event_type)
void call_response_callback_and_erase(
EventType event_type,
std::shared_ptr<void> & response,
size_t goal_id = 0,
bool erase_event_info = true)
{
// Note: we bind the int identifier argument to this waitable's entity types
auto new_callback =
[callback, event_type, this](size_t number_of_events) {
try {
callback(number_of_events, static_cast<int>(event_type));
} catch (const std::exception & exception) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp_action"),
"rclcpp::experimental::ActionClientIntraProcessBase@" << this <<
" caught " << rmw::impl::cpp::demangle(exception) <<
" exception in user-provided callback for the 'on ready' callback: " <<
exception.what());
} catch (...) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp_action"),
"rclcpp::experimental::ActionClientIntraProcessBase@" << this <<
" caught unhandled exception in user-provided callback " <<
"for the 'on ready' callback");
if (event_type == EventType::ResultResponse) {
std::lock_guard<std::recursive_mutex> lock(result_reponse_mutex_);

auto it = result_response_map_.find(goal_id);

if (it != result_response_map_.end()) {
auto & response_callback = it->second.response_callback;
if (response_callback) {
response_callback(response);
result_response_map_.erase(it);
} else {
throw std::runtime_error("response_callback invalid!");
}
} else {
throw std::runtime_error("Goal ID not found");
}
return;
}

// For the rest of the event types
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);

auto range = event_info_multi_map_.equal_range(goal_id);

for (auto it = range.first; it != range.second; ++it) {
if (it->second.event_type == event_type) {
auto & response_callback = it->second.response_callback;
if (response_callback) {
response_callback(response);
} else {
throw std::runtime_error(
"IPC ActionClient: response_callback not set! EventType: " +
std::to_string(static_cast<int>(event_type)));
}
};
if (erase_event_info) {
event_info_multi_map_.erase(it);
}
return;
}
}
}

bool goal_has_response_callback(size_t goal_id)
{
std::lock_guard<std::recursive_mutex> lock(result_reponse_mutex_);

auto it = result_response_map_.find(goal_id);

return new_callback;
if (it != result_response_map_.end()) {
if(it->second.response_callback) {
return true;
}
}

return false;
}

private:
std::string action_name_;
QoS qos_profile_;
};

} // namespace experimental
Expand Down
Loading