Skip to content

Commit

Permalink
Fix mutltiple client requests (irobot-ros#142)
Browse files Browse the repository at this point in the history
* store map of unique request id to client id and callback info pair

* fix map end check

* fix undefined reference

* remove unnecessary request id erase, remove/fix unique id comment

* improve unique id comment
  • Loading branch information
bpwilcox authored and Alexis Pojomovsky committed Jun 21, 2024
1 parent f5b2001 commit b865383
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
30 changes: 22 additions & 8 deletions rclcpp/include/rclcpp/experimental/service_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,22 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
return std::static_pointer_cast<void>(data);
}

void send_response(uint64_t intra_process_client_id, SharedResponse & response)
void send_response(int64_t client_request_id, SharedResponse & response)
{
std::unique_lock<std::recursive_mutex> lock(reentrant_mutex_);

auto client_request_it = callback_info_.find(client_request_id);

if (client_request_it == callback_info_.end()) {
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling intra_process_service_send_response for invalid or no "
"longer existing request id");

return;
}

auto intra_process_client_id = client_request_it->second.first;
auto client_it = clients_.find(intra_process_client_id);

if (client_it == clients_.end()) {
Expand All @@ -120,7 +132,7 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
"Calling intra_process_service_send_response for invalid or no "
"longer existing client id");

callback_info_.erase(intra_process_client_id);
callback_info_.erase(client_request_id);
return;
}

Expand All @@ -130,14 +142,14 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
auto client = std::dynamic_pointer_cast<
rclcpp::experimental::ClientIntraProcess<ServiceT>>(
client_intra_process_base);
CallbackInfoVariant & value = callback_info_[intra_process_client_id];
CallbackInfoVariant & value = client_request_it->second.second;
client->store_intra_process_response(
std::make_pair(std::move(response), std::move(value)));
} else {
clients_.erase(client_it);
}

callback_info_.erase(intra_process_client_id);
callback_info_.erase(client_request_id);
}

void execute(std::shared_ptr<void> & data)
Expand All @@ -154,18 +166,20 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
uint64_t intra_process_client_id = ptr->first;
SharedRequest & typed_request = ptr->second.first;
CallbackInfoVariant & value = ptr->second.second;
callback_info_.emplace(std::make_pair(intra_process_client_id, std::move(value)));

// To allow for the user callback to handle deferred responses for IPC in an ambiguous way,
// we are overloading the rmw_request_id semantics to provide the intra process client ID.
uint64_t client_request_id = get_unique_request_id();
auto req_id = std::make_shared<rmw_request_id_t>();
req_id->sequence_number = intra_process_client_id;
req_id->sequence_number = client_request_id;
req_id->from_intra_process = true;

callback_info_.emplace(std::make_pair(req_id->sequence_number, std::make_pair(intra_process_client_id, std::move(value))));

SharedResponse response = any_callback_.dispatch(serv_handle, req_id, std::move(typed_request));

if (response) {
send_response(intra_process_client_id, response);
send_response(req_id->sequence_number, response);
}
}

Expand All @@ -182,7 +196,7 @@ class ServiceIntraProcess : public ServiceIntraProcessBase

// Store callback variants in a map to support deferred response
// access by intra-process client id.
std::unordered_map<uint64_t, CallbackInfoVariant> callback_info_;
std::unordered_map<int64_t, std::pair<uint64_t, CallbackInfoVariant>> callback_info_;
};

} // namespace experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ class ServiceIntraProcessBase : public rclcpp::Waitable
}
}

uint64_t get_unique_request_id();

using ClientMap =
std::unordered_map<uint64_t, rclcpp::experimental::ClientIntraProcessBase::WeakPtr>;

Expand Down
14 changes: 14 additions & 0 deletions rclcpp/src/rclcpp/service_intra_process_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,17 @@ ServiceIntraProcessBase::add_intra_process_client(
std::unique_lock<std::recursive_mutex> lock(reentrant_mutex_);
clients_[client_id] = client;
}

uint64_t
ServiceIntraProcessBase::get_unique_request_id()
{
static std::atomic<uint64_t> _next_unique_id {1};

auto next_id = _next_unique_id.fetch_add(1, std::memory_order_relaxed);
if (0 == next_id) {
throw std::overflow_error(
"exhausted the unique ids for client requests in this process "
"(congratulations your computer is either extremely fast or extremely old)");
}
return next_id;
}

0 comments on commit b865383

Please sign in to comment.