diff --git a/rclcpp/include/rclcpp/experimental/service_intra_process.hpp b/rclcpp/include/rclcpp/experimental/service_intra_process.hpp index b87aa15360..b99f50434b 100644 --- a/rclcpp/include/rclcpp/experimental/service_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/service_intra_process.hpp @@ -108,10 +108,22 @@ class ServiceIntraProcess : public ServiceIntraProcessBase return std::static_pointer_cast(data); } - void send_response(uint64_t intra_process_client_id, SharedResponse & response) + void send_response(int64_t client_request_id, SharedResponse & response) { std::unique_lock lock(reentrant_mutex_); + auto client_request_it = callback_info_.find(client_request_id); + + if (client_request_it == callback_info_.end()) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling intra_process_service_send_response for invalid or no " + "longer existing request id"); + + return; + } + + auto intra_process_client_id = client_request_it->second.first; auto client_it = clients_.find(intra_process_client_id); if (client_it == clients_.end()) { @@ -120,7 +132,7 @@ class ServiceIntraProcess : public ServiceIntraProcessBase "Calling intra_process_service_send_response for invalid or no " "longer existing client id"); - callback_info_.erase(intra_process_client_id); + callback_info_.erase(client_request_id); return; } @@ -130,14 +142,14 @@ class ServiceIntraProcess : public ServiceIntraProcessBase auto client = std::dynamic_pointer_cast< rclcpp::experimental::ClientIntraProcess>( client_intra_process_base); - CallbackInfoVariant & value = callback_info_[intra_process_client_id]; + CallbackInfoVariant & value = client_request_it->second.second; client->store_intra_process_response( std::make_pair(std::move(response), std::move(value))); } else { clients_.erase(client_it); } - callback_info_.erase(intra_process_client_id); + callback_info_.erase(client_request_id); } void execute(std::shared_ptr & data) @@ -154,18 +166,20 @@ class ServiceIntraProcess : public ServiceIntraProcessBase uint64_t intra_process_client_id = ptr->first; SharedRequest & typed_request = ptr->second.first; CallbackInfoVariant & value = ptr->second.second; - callback_info_.emplace(std::make_pair(intra_process_client_id, std::move(value))); // To allow for the user callback to handle deferred responses for IPC in an ambiguous way, // we are overloading the rmw_request_id semantics to provide the intra process client ID. + uint64_t client_request_id = get_unique_request_id(); auto req_id = std::make_shared(); - req_id->sequence_number = intra_process_client_id; + req_id->sequence_number = client_request_id; req_id->from_intra_process = true; + callback_info_.emplace(std::make_pair(req_id->sequence_number, std::make_pair(intra_process_client_id, std::move(value)))); + SharedResponse response = any_callback_.dispatch(serv_handle, req_id, std::move(typed_request)); if (response) { - send_response(intra_process_client_id, response); + send_response(req_id->sequence_number, response); } } @@ -182,7 +196,7 @@ class ServiceIntraProcess : public ServiceIntraProcessBase // Store callback variants in a map to support deferred response // access by intra-process client id. - std::unordered_map callback_info_; + std::unordered_map> callback_info_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp index 1b3262e7e8..8d48f0f508 100644 --- a/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp @@ -194,6 +194,8 @@ class ServiceIntraProcessBase : public rclcpp::Waitable } } + uint64_t get_unique_request_id(); + using ClientMap = std::unordered_map; diff --git a/rclcpp/src/rclcpp/service_intra_process_base.cpp b/rclcpp/src/rclcpp/service_intra_process_base.cpp index 2ccb38779c..36d931df23 100644 --- a/rclcpp/src/rclcpp/service_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/service_intra_process_base.cpp @@ -43,3 +43,17 @@ ServiceIntraProcessBase::add_intra_process_client( std::unique_lock lock(reentrant_mutex_); clients_[client_id] = client; } + +uint64_t +ServiceIntraProcessBase::get_unique_request_id() +{ + static std::atomic _next_unique_id {1}; + + auto next_id = _next_unique_id.fetch_add(1, std::memory_order_relaxed); + if (0 == next_id) { + throw std::overflow_error( + "exhausted the unique ids for client requests in this process " + "(congratulations your computer is either extremely fast or extremely old)"); + } + return next_id; +}