diff --git a/CHANGELOG.md b/CHANGELOG.md index b7d0597672..2cb294e721 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ Increment the: * [SDK] Async Batch Span/Log processor with max async support ([#1306](https://github.com/open-telemetry/opentelemetry-cpp/pull/1306)) * [EXPORTER] OTLP http exporter allow concurrency session ([#1209](https://github.com/open-telemetry/opentelemetry-cpp/pull/1209)) +* [EXT] `curl::HttpClient` use `curl_multi_handle` instead of creating a thread + for every request and it's able to reuse connections now. ([#1317](https://github.com/open-telemetry/opentelemetry-cpp/pull/1317)) ## [1.3.0] 2022-04-11 diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h index 1b21ed9f90..edb0f59afc 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h @@ -79,7 +79,10 @@ struct OtlpHttpClientOptions OtlpHeaders http_headers = GetOtlpDefaultHeaders(); // Concurrent requests - std::size_t max_concurrent_requests = 8; + std::size_t max_concurrent_requests = 64; + + // Concurrent requests + std::size_t max_requests_per_connection = 8; inline OtlpHttpClientOptions(nostd::string_view input_url, HttpRequestContentType input_content_type, @@ -88,7 +91,8 @@ struct OtlpHttpClientOptions bool input_console_debug, std::chrono::system_clock::duration input_timeout, const OtlpHeaders &input_http_headers, - std::size_t input_concurrent_sessions = 8) + std::size_t input_concurrent_sessions = 64, + std::size_t input_max_requests_per_connection = 8) : url(input_url), content_type(input_content_type), json_bytes_mapping(input_json_bytes_mapping), @@ -96,7 +100,8 @@ struct OtlpHttpClientOptions console_debug(input_console_debug), timeout(input_timeout), http_headers(input_http_headers), - max_concurrent_requests(input_concurrent_sessions) + max_concurrent_requests(input_concurrent_sessions), + max_requests_per_connection(input_max_requests_per_connection) {} }; diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h index a2f9eb3176..6dfe8f1f34 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h @@ -55,7 +55,10 @@ struct OtlpHttpExporterOptions #ifdef ENABLE_ASYNC_EXPORT // Concurrent requests // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-concurrent-requests - std::size_t max_concurrent_requests = 8; + std::size_t max_concurrent_requests = 64; + + // Concurrent requests + std::size_t max_requests_per_connection = 8; #endif }; diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h index 2bd8697076..91e4ccf714 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h @@ -55,7 +55,10 @@ struct OtlpHttpLogExporterOptions # ifdef ENABLE_ASYNC_EXPORT // Concurrent requests // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-concurrent-requests - std::size_t max_concurrent_requests = 8; + std::size_t max_concurrent_requests = 64; + + // Concurrent requests + std::size_t max_requests_per_connection = 8; # endif }; diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index 1e5f128aaf..b2fc75838d 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -649,7 +649,9 @@ void ConvertListFieldToJson(nlohmann::json &value, OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options) : is_shutdown_(false), options_(options), http_client_(http_client::HttpClientFactory::Create()) -{} +{ + http_client_->SetMaxSessionsPerConnection(options_.max_requests_per_connection); +} OtlpHttpClient::~OtlpHttpClient() { @@ -682,7 +684,9 @@ OtlpHttpClient::~OtlpHttpClient() OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options, std::shared_ptr http_client) : is_shutdown_(false), options_(options), http_client_(http_client) -{} +{ + http_client_->SetMaxSessionsPerConnection(options_.max_requests_per_connection); +} // ----------------------------- HTTP Client methods ------------------------------ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export( diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 8f51165cc8..f5b50162f3 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -34,7 +34,8 @@ OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options) options.http_headers #ifdef ENABLE_ASYNC_EXPORT , - options.max_concurrent_requests + options.max_concurrent_requests, + options.max_requests_per_connection #endif ))) {} diff --git a/exporters/otlp/src/otlp_http_log_exporter.cc b/exporters/otlp/src/otlp_http_log_exporter.cc index 2e377699ed..d2430bf924 100644 --- a/exporters/otlp/src/otlp_http_log_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_exporter.cc @@ -36,7 +36,8 @@ OtlpHttpLogExporter::OtlpHttpLogExporter(const OtlpHttpLogExporterOptions &optio options.http_headers # ifdef ENABLE_ASYNC_EXPORT , - options.max_concurrent_requests + options.max_concurrent_requests, + options.max_requests_per_connection # endif ))) {} diff --git a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h index ac263193bc..888b7e676f 100644 --- a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h +++ b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h @@ -3,13 +3,19 @@ #pragma once -#include "http_operation_curl.h" +#include "opentelemetry/ext/http/client/curl/http_operation_curl.h" #include "opentelemetry/ext/http/client/http_client.h" #include "opentelemetry/ext/http/common/url_parser.h" +#include "opentelemetry/nostd/shared_ptr.h" #include "opentelemetry/version.h" -#include +#include +#include +#include #include +#include +#include +#include #include OPENTELEMETRY_BEGIN_NAMESPACE @@ -24,6 +30,23 @@ namespace curl const opentelemetry::ext::http::client::StatusCode Http_Ok = 200; +class HttpCurlGlobalInitializer +{ +private: + HttpCurlGlobalInitializer(const HttpCurlGlobalInitializer &) = delete; + HttpCurlGlobalInitializer(HttpCurlGlobalInitializer &&) = delete; + + HttpCurlGlobalInitializer &operator=(const HttpCurlGlobalInitializer &) = delete; + HttpCurlGlobalInitializer &operator=(HttpCurlGlobalInitializer &&) = delete; + + HttpCurlGlobalInitializer(); + +public: + ~HttpCurlGlobalInitializer(); + + static nostd::shared_ptr GetInstance(); +}; + class Request : public opentelemetry::ext::http::client::Request { public: @@ -53,10 +76,7 @@ class Request : public opentelemetry::ext::http::client::Request AddHeader(name, value); } - virtual void SetUri(nostd::string_view uri) noexcept override - { - uri_ = static_cast(uri); - } + void SetUri(nostd::string_view uri) noexcept override { uri_ = static_cast(uri); } void SetTimeoutMs(std::chrono::milliseconds timeout_ms) noexcept override { @@ -76,14 +96,10 @@ class Response : public opentelemetry::ext::http::client::Response public: Response() : status_code_(Http_Ok) {} - virtual const opentelemetry::ext::http::client::Body &GetBody() const noexcept override - { - return body_; - } + const opentelemetry::ext::http::client::Body &GetBody() const noexcept override { return body_; } - virtual bool ForEachHeader( - nostd::function_ref callable) - const noexcept override + bool ForEachHeader(nostd::function_ref + callable) const noexcept override { for (const auto &header : headers_) { @@ -95,10 +111,9 @@ class Response : public opentelemetry::ext::http::client::Response return true; } - virtual bool ForEachHeader( - const nostd::string_view &name, - nostd::function_ref callable) - const noexcept override + bool ForEachHeader(const nostd::string_view &name, + nostd::function_ref + callable) const noexcept override { auto range = headers_.equal_range(static_cast(name)); for (auto it = range.first; it != range.second; ++it) @@ -111,7 +126,7 @@ class Response : public opentelemetry::ext::http::client::Response return true; } - virtual opentelemetry::ext::http::client::StatusCode GetStatusCode() const noexcept override + opentelemetry::ext::http::client::StatusCode GetStatusCode() const noexcept override { return status_code_; } @@ -124,7 +139,8 @@ class Response : public opentelemetry::ext::http::client::Response class HttpClient; -class Session : public opentelemetry::ext::http::client::Session +class Session : public opentelemetry::ext::http::client::Session, + public std::enable_shared_from_this { public: Session(HttpClient &http_client, @@ -142,40 +158,17 @@ class Session : public opentelemetry::ext::http::client::Session return http_request_; } - virtual void SendRequest( - std::shared_ptr callback) noexcept override - { - is_session_active_ = true; - std::string url = host_ + std::string(http_request_->uri_); - auto callback_ptr = callback.get(); - curl_operation_.reset(new HttpOperation( - http_request_->method_, url, callback_ptr, RequestMode::Async, http_request_->headers_, - http_request_->body_, false, http_request_->timeout_ms_)); - curl_operation_->SendAsync([this, callback](HttpOperation &operation) { - if (operation.WasAborted()) - { - // Manually cancelled - callback->OnEvent(opentelemetry::ext::http::client::SessionState::Cancelled, ""); - } - - if (operation.GetResponseCode() >= CURL_LAST) - { - // we have a http response - auto response = std::unique_ptr(new Response()); - response->headers_ = operation.GetResponseHeaders(); - response->body_ = operation.GetResponseBody(); - response->status_code_ = operation.GetResponseCode(); - callback->OnResponse(*response); - } - is_session_active_ = false; - }); - } + void SendRequest( + std::shared_ptr callback) noexcept override; - virtual bool CancelSession() noexcept override; + bool CancelSession() noexcept override; - virtual bool FinishSession() noexcept override; + bool FinishSession() noexcept override; - virtual bool IsSessionActive() noexcept override { return is_session_active_; } + bool IsSessionActive() noexcept override + { + return is_session_active_.load(std::memory_order_acquire); + } void SetId(uint64_t session_id) { session_id_ = session_id; } @@ -188,19 +181,36 @@ class Session : public opentelemetry::ext::http::client::Session #ifdef ENABLE_TEST std::shared_ptr GetRequest() { return http_request_; } #endif + + inline HttpClient &GetHttpClient() noexcept { return http_client_; } + inline const HttpClient &GetHttpClient() const noexcept { return http_client_; } + + inline uint64_t GetSessionId() const noexcept { return session_id_; } + + inline const std::unique_ptr &GetOperation() const noexcept + { + return curl_operation_; + } + inline std::unique_ptr &GetOperation() noexcept { return curl_operation_; } + + /** + * Finish and cleanup the operation.It will remove curl easy handle in it from HttpClient + */ + void FinishOperation(); + private: std::shared_ptr http_request_; std::string host_; std::unique_ptr curl_operation_; uint64_t session_id_; HttpClient &http_client_; - bool is_session_active_; + std::atomic is_session_active_; }; class HttpClientSync : public opentelemetry::ext::http::client::HttpClientSync { public: - HttpClientSync() { curl_global_init(CURL_GLOBAL_ALL); } + HttpClientSync() : curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance()) {} opentelemetry::ext::http::client::Result Get( const nostd::string_view &url, @@ -208,7 +218,7 @@ class HttpClientSync : public opentelemetry::ext::http::client::HttpClientSync { opentelemetry::ext::http::client::Body body; HttpOperation curl_operation(opentelemetry::ext::http::client::Method::Get, url.data(), nullptr, - RequestMode::Sync, headers, body); + headers, body); curl_operation.SendSync(); auto session_state = curl_operation.GetSessionState(); if (curl_operation.WasAborted()) @@ -233,7 +243,7 @@ class HttpClientSync : public opentelemetry::ext::http::client::HttpClientSync const opentelemetry::ext::http::client::Headers &headers) noexcept override { HttpOperation curl_operation(opentelemetry::ext::http::client::Method::Post, url.data(), - nullptr, RequestMode::Sync, headers, body); + nullptr, headers, body); curl_operation.SendSync(); auto session_state = curl_operation.GetSessionState(); if (curl_operation.WasAborted()) @@ -253,72 +263,84 @@ class HttpClientSync : public opentelemetry::ext::http::client::HttpClientSync return opentelemetry::ext::http::client::Result(std::move(response), session_state); } - ~HttpClientSync() { curl_global_cleanup(); } + ~HttpClientSync() {} + +private: + nostd::shared_ptr curl_global_initializer_; }; class HttpClient : public opentelemetry::ext::http::client::HttpClient { public: // The call (curl_global_init) is not thread safe. Ensure this is called only once. - HttpClient() : next_session_id_{0} { curl_global_init(CURL_GLOBAL_ALL); } + HttpClient(); + ~HttpClient(); std::shared_ptr CreateSession( - nostd::string_view url) noexcept override + nostd::string_view url) noexcept override; + + bool CancelAllSessions() noexcept override; + + bool FinishAllSessions() noexcept override; + + void SetMaxSessionsPerConnection(std::size_t max_requests_per_connection) noexcept override; + + inline uint64_t GetMaxSessionsPerConnection() const noexcept { - auto parsedUrl = common::UrlParser(std::string(url)); - if (!parsedUrl.success_) - { - return std::make_shared(*this); - } - auto session = - std::make_shared(*this, parsedUrl.scheme_, parsedUrl.host_, parsedUrl.port_); - auto session_id = ++next_session_id_; - session->SetId(session_id); - sessions_.insert({session_id, session}); - return session; + return max_sessions_per_connection_; } - bool CancelAllSessions() noexcept override + void CleanupSession(uint64_t session_id); + + inline CURLM *GetMultiHandle() noexcept { return multi_handle_; } + + void MaybeSpawnBackgroundThread(); + + void ScheduleAddSession(uint64_t session_id); + void ScheduleAbortSession(uint64_t session_id); + void ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource &&resource); + +#ifdef ENABLE_TEST + void WaitBackgroundThreadExit() { - // CancelSession may change sessions_, we can not change a container while iterating it. - while (!sessions_.empty()) + std::unique_ptr background_thread; { - std::map> sessions; - sessions.swap(sessions_); - for (auto &session : sessions) - { - session.second->CancelSession(); - } + std::lock_guard lock_guard{background_thread_m_}; + background_thread.swap(background_thread_); } - return true; - } - bool FinishAllSessions() noexcept override - { - // FinishSession may change sessions_, we can not change a container while iterating it. - while (!sessions_.empty()) + if (background_thread && background_thread->joinable()) { - std::map> sessions; - sessions.swap(sessions_); - for (auto &session : sessions) - { - session.second->FinishSession(); - } + background_thread->join(); } - return true; } - - void CleanupSession(uint64_t session_id) - { - // TBD = Need to be thread safe - sessions_.erase(session_id); - } - - ~HttpClient() { curl_global_cleanup(); } +#endif private: + void wakeupBackgroundThread(); + bool doAddSessions(); + bool doAbortSessions(); + bool doRemoveSessions(); + void resetMultiHandle(); + + std::mutex multi_handle_m_; + CURLM *multi_handle_; std::atomic next_session_id_; - std::map> sessions_; + uint64_t max_sessions_per_connection_; + + std::mutex sessions_m_; + std::recursive_mutex session_ids_m_; + std::unordered_map> sessions_; + std::unordered_set pending_to_add_session_ids_; + std::unordered_set pending_to_abort_session_ids_; + std::unordered_map pending_to_remove_session_handles_; + std::list> pending_to_remove_sessions_; + + std::mutex background_thread_m_; + std::unique_ptr background_thread_; + std::chrono::milliseconds scheduled_delay_milliseconds_; + + nostd::shared_ptr curl_global_initializer_; }; } // namespace curl diff --git a/ext/include/opentelemetry/ext/http/client/curl/http_operation_curl.h b/ext/include/opentelemetry/ext/http/client/curl/http_operation_curl.h index 09cf8c7c2a..393d6393ed 100644 --- a/ext/include/opentelemetry/ext/http/client/curl/http_operation_curl.h +++ b/ext/include/opentelemetry/ext/http/client/curl/http_operation_curl.h @@ -3,7 +3,6 @@ #pragma once -#include "http_client_curl.h" #include "opentelemetry/ext/http/client/http_client.h" #include "opentelemetry/version.h" @@ -12,6 +11,7 @@ #include #include #include +#include #include #ifdef _WIN32 # include @@ -34,29 +34,89 @@ const std::chrono::milliseconds default_http_conn_timeout(5000); // ms const std::string http_status_regexp = "HTTP\\/\\d\\.\\d (\\d+)\\ .*"; const std::string http_header_regexp = "(.*)\\: (.*)\\n*"; -enum class RequestMode +class HttpClient; +class Session; + +struct HttpCurlEasyResource { - Sync, - Async + CURL *easy_handle; + curl_slist *headers_chunk; + + HttpCurlEasyResource(CURL *curl = nullptr, curl_slist *headers = nullptr) + : easy_handle{curl}, headers_chunk{headers} + {} + + HttpCurlEasyResource(HttpCurlEasyResource &&other) + : easy_handle{other.easy_handle}, headers_chunk{other.headers_chunk} + { + other.easy_handle = nullptr; + other.headers_chunk = nullptr; + } + + HttpCurlEasyResource &operator=(HttpCurlEasyResource &&other) + { + using std::swap; + swap(easy_handle, other.easy_handle); + swap(headers_chunk, other.headers_chunk); + + return *this; + } + + HttpCurlEasyResource(const HttpCurlEasyResource &other) = delete; + HttpCurlEasyResource &operator=(const HttpCurlEasyResource &other) = delete; }; class HttpOperation { -public: - void DispatchEvent(opentelemetry::ext::http::client::SessionState type, std::string reason = "") - { - if (request_mode_ == RequestMode::Async && callback_ != nullptr) - { - callback_->OnEvent(type, reason); - } - else - { - session_state_ = type; - } - } +private: + /** + * Old-school memory allocator + * + * @param contents + * @param size + * @param nmemb + * @param userp + * @return + */ + static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp); - std::atomic is_aborted_; // Set to 'true' when async callback is aborted - std::atomic is_finished_; // Set to 'true' when async callback is finished. + /** + * C++ STL std::vector allocator + * + * @param ptr + * @param size + * @param nmemb + * @param data + * @return + */ + static size_t WriteVectorHeaderCallback(void *ptr, size_t size, size_t nmemb, void *userp); + static size_t WriteVectorBodyCallback(void *ptr, size_t size, size_t nmemb, void *userp); + + static size_t ReadMemoryCallback(char *buffer, size_t size, size_t nitems, void *userp); + +#if LIBCURL_VERSION_NUM >= 0x075000 + static int PreRequestCallback(void *clientp, + char *conn_primary_ip, + char *conn_local_ip, + int conn_primary_port, + int conn_local_port); +#endif + +#if LIBCURL_VERSION_NUM >= 0x072000 + static int OnProgressCallback(void *clientp, + curl_off_t dltotal, + curl_off_t dlnow, + curl_off_t ultotal, + curl_off_t ulnow); +#else + static int OnProgressCallback(void *clientp, + double dltotal, + double dlnow, + double ultotal, + double ulnow); +#endif +public: + void DispatchEvent(opentelemetry::ext::http::client::SessionState type, std::string reason = ""); /** * Create local CURL instance for url and body @@ -71,8 +131,7 @@ class HttpOperation */ HttpOperation(opentelemetry::ext::http::client::Method method, std::string url, - opentelemetry::ext::http::client::EventHandler *callback, - RequestMode request_mode = RequestMode::Async, + opentelemetry::ext::http::client::EventHandler *event_handle, // Default empty headers and empty request body const opentelemetry::ext::http::client::Headers &request_headers = opentelemetry::ext::http::client::Headers(), @@ -80,238 +139,52 @@ class HttpOperation opentelemetry::ext::http::client::Body(), // Default connectivity and response size options bool is_raw_response = false, - std::chrono::milliseconds http_conn_timeout = default_http_conn_timeout) - : is_aborted_(false), - is_finished_(false), - // Optional connection params - is_raw_response_(is_raw_response), - http_conn_timeout_(http_conn_timeout), - request_mode_(request_mode), - curl_(nullptr), - // Result - res_(CURLE_OK), - callback_(callback), - method_(method), - url_(url), - // Local vars - request_headers_(request_headers), - request_body_(request_body), - sockfd_(0), - nread_(0) - { - /* get a curl handle */ - curl_ = curl_easy_init(); - if (!curl_) - { - res_ = CURLE_FAILED_INIT; - DispatchEvent(opentelemetry::ext::http::client::SessionState::CreateFailed); - return; - } - - curl_easy_setopt(curl_, CURLOPT_VERBOSE, 0); - - // Specify target URL - curl_easy_setopt(curl_, CURLOPT_URL, url_.c_str()); - - // TODO: support ssl cert verification for https request - curl_easy_setopt(curl_, CURLOPT_SSL_VERIFYPEER, 0); // 1L - curl_easy_setopt(curl_, CURLOPT_SSL_VERIFYHOST, 0); // 2L - - // Specify our custom headers - for (auto &kv : this->request_headers_) - { - std::string header = std::string(kv.first); - header += ": "; - header += std::string(kv.second); - headers_chunk_ = curl_slist_append(headers_chunk_, header.c_str()); - } - - if (headers_chunk_ != nullptr) - { - curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, headers_chunk_); - } - - DispatchEvent(opentelemetry::ext::http::client::SessionState::Created); - } + std::chrono::milliseconds http_conn_timeout = default_http_conn_timeout, + bool reuse_connection = false); /** * Destroy CURL instance */ - virtual ~HttpOperation() - { - // Given the request has not been aborted we should wait for completion here - // This guarantees the lifetime of this request. - if (result_.valid()) - { - result_.wait(); - } - // TBD - Need to be uncomment. This will callback instance is deleted. - // DispatchEvent(opentelemetry::ext::http::client::SessionState::Destroy); - res_ = CURLE_OK; - curl_easy_cleanup(curl_); - curl_slist_free_all(headers_chunk_); - ReleaseResponse(); - } + virtual ~HttpOperation(); /** * Finish CURL instance */ - virtual void Finish() - { - if (result_.valid() && !is_finished_) - { - result_.wait(); - is_finished_ = true; - } - } + virtual void Finish(); + + /** + * Cleanup all resource of curl + */ + void Cleanup(); + + /** + * Setup request + */ + CURLcode Setup(); /** * Send request synchronously */ - long Send() - { - ReleaseResponse(); - // Request buffer - const void *request = (request_body_.empty()) ? NULL : &request_body_[0]; - const size_t req_size = request_body_.size(); - if (!curl_) - { - res_ = CURLE_FAILED_INIT; - DispatchEvent(opentelemetry::ext::http::client::SessionState::SendFailed); - return res_; - } - - // TODO: control local port to use - // curl_easy_setopt(curl, CURLOPT_LOCALPORT, dcf_port); - - // Perform initial connect, handling the timeout if needed - curl_easy_setopt(curl_, CURLOPT_CONNECT_ONLY, 1L); - curl_easy_setopt(curl_, CURLOPT_TIMEOUT_MS, http_conn_timeout_.count()); - DispatchEvent(opentelemetry::ext::http::client::SessionState::Connecting); - res_ = curl_easy_perform(curl_); - if (CURLE_OK != res_) - { - DispatchEvent(opentelemetry::ext::http::client::SessionState::ConnectFailed, - curl_easy_strerror(res_)); // couldn't connect - stage 1 - return res_; - } - - /* Extract the socket from the curl handle - we'll need it for waiting. - * Note that this API takes a pointer to a 'long' while we use - * curl_socket_t for sockets otherwise. - */ - long sockextr = 0; - res_ = curl_easy_getinfo(curl_, CURLINFO_LASTSOCKET, &sockextr); - - if (CURLE_OK != res_) - { - DispatchEvent(opentelemetry::ext::http::client::SessionState::ConnectFailed, - curl_easy_strerror(res_)); // couldn't connect - stage 2 - return res_; - } - - /* wait for the socket to become ready for sending */ - sockfd_ = sockextr; - if (!WaitOnSocket(sockfd_, 0, static_cast(http_conn_timeout_.count())) || is_aborted_) - { - res_ = CURLE_OPERATION_TIMEDOUT; - DispatchEvent( - opentelemetry::ext::http::client::SessionState::ConnectFailed, - " Is aborted: " + std::to_string(is_aborted_.load())); // couldn't connect - stage 3 - return res_; - } - - DispatchEvent(opentelemetry::ext::http::client::SessionState::Connected); - // once connection is there - switch back to easy perform for HTTP post - curl_easy_setopt(curl_, CURLOPT_CONNECT_ONLY, 0); - - // send all data to our callback function - if (is_raw_response_) - { - curl_easy_setopt(curl_, CURLOPT_HEADER, true); - curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, (void *)&WriteMemoryCallback); - curl_easy_setopt(curl_, CURLOPT_WRITEDATA, (void *)&raw_response_); - } - else - { - curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, (void *)&WriteVectorCallback); - curl_easy_setopt(curl_, CURLOPT_HEADERDATA, (void *)&resp_headers_); - curl_easy_setopt(curl_, CURLOPT_WRITEDATA, (void *)&resp_body_); - } - - // TODO: only two methods supported for now - POST and GET - if (method_ == opentelemetry::ext::http::client::Method::Post) - { - // POST - curl_easy_setopt(curl_, CURLOPT_POST, true); - curl_easy_setopt(curl_, CURLOPT_POSTFIELDS, (const char *)request); - curl_easy_setopt(curl_, CURLOPT_POSTFIELDSIZE, req_size); - } - else if (method_ == opentelemetry::ext::http::client::Method::Get) - { - // GET - } - else - { - res_ = CURLE_UNSUPPORTED_PROTOCOL; - return res_; - } - - // abort if slower than 4kb/sec during 30 seconds - curl_easy_setopt(curl_, CURLOPT_LOW_SPEED_TIME, 30L); - curl_easy_setopt(curl_, CURLOPT_LOW_SPEED_LIMIT, 4096); - DispatchEvent(opentelemetry::ext::http::client::SessionState::Sending); - - res_ = curl_easy_perform(curl_); - if (CURLE_OK != res_) - { - DispatchEvent(opentelemetry::ext::http::client::SessionState::SendFailed, - curl_easy_strerror(res_)); - return res_; - } - - /* Code snippet to parse raw HTTP response. This might come in handy - * if we ever consider to handle the raw upload instead of curl_easy_perform - ... - std::string resp((const char *)response); - std::regex http_status_regex(HTTP_STATUS_REGEXP); - std::smatch match; - if(std::regex_search(resp, match, http_status_regex)) - http_code = std::stol(match[1]); - ... - */ - - /* libcurl is nice enough to parse the http response code itself: */ - curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &res_); - // We got some response from server. Dump the contents. - DispatchEvent(opentelemetry::ext::http::client::SessionState::Response); - - // This function returns: - // - on success: HTTP status code. - // - on failure: CURL error code. - // The two sets of enums (CURLE, HTTP codes) - do not intersect, so we collapse them in one set. - return res_; - } + CURLcode Send(); - std::future &SendAsync(std::function callback = nullptr) - { - result_ = std::async(std::launch::async, [this, callback] { - long result = Send(); - if (callback != nullptr) - { - callback(*this); - } - return result; - }); - return result_; - } + /** + * Send request asynchronously + * @param session This operator must be binded to a Session + * @param callback callback when start async request success and got response + */ + CURLcode SendAsync(Session *session, std::function callback = nullptr); - void SendSync() { Send(); } + inline void SendSync() { Send(); } /** - * Get HTTP response code. This function returns CURL error code if HTTP response code is invalid. + * Get HTTP response code. This function returns 0. */ - uint16_t GetResponseCode() { return res_; } + inline StatusCode GetResponseCode() const noexcept + { + return static_cast(response_code_); + } + + CURLcode GetLastResultCode() { return last_curl_result_; } /** * Get last session state. @@ -321,198 +194,87 @@ class HttpOperation /** * Get whether or not response was programmatically aborted */ - bool WasAborted() { return is_aborted_.load(); } + bool WasAborted() { return is_aborted_.load(std::memory_order_acquire); } /** * Return a copy of resposne headers * * @return */ - Headers GetResponseHeaders() - { - Headers result; - if (resp_headers_.size() == 0) - return result; - - std::stringstream ss; - std::string headers((const char *)&resp_headers_[0], resp_headers_.size()); - ss.str(headers); - - std::string header; - while (std::getline(ss, header, '\n')) - { - // TODO - Regex below crashes with out-of-memory on CI docker container, so - // switching to string comparison. Need to debug and revert back. - - /*std::smatch match; - std::regex http_headers_regex(http_header_regexp); - if (std::regex_search(header, match, http_headers_regex)) - result.insert(std::pair( - static_cast(match[1]), static_cast(match[2]))); - */ - size_t pos = header.find(": "); - if (pos != std::string::npos) - result.insert( - std::pair(header.substr(0, pos), header.substr(pos + 2))); - } - return result; - } + Headers GetResponseHeaders(); /** * Return a copy of response body * * @return */ - std::vector GetResponseBody() { return resp_body_; } + inline const std::vector &GetResponseBody() const noexcept { return response_body_; } /** * Return a raw copy of response headers+body * * @return */ - std::vector GetRawResponse() { return raw_response_; } + inline const std::vector &GetRawResponse() const noexcept { return raw_response_; } /** * Release memory allocated for response */ - void ReleaseResponse() - { - resp_headers_.clear(); - resp_body_.clear(); - raw_response_.clear(); - } + void ReleaseResponse(); /** * Abort request in connecting or reading state. */ - void Abort() - { - is_aborted_ = true; - if (curl_ != nullptr) - { - // Simply close the socket - connection reset by peer - if (sockfd_) - { -#if defined(_WIN32) - ::closesocket(sockfd_); -#else - ::close(sockfd_); -#endif - sockfd_ = 0; - } - } - } + void Abort(); - CURL *GetHandle() { return curl_; } + /** + * Perform curl message, this function only can be called in the polling thread and it can only + * be called when got a CURLMSG_DONE. + * + * @param code + */ + void PerformCurlMessage(CURLcode code); -protected: - const bool is_raw_response_; // Do not split response headers from response body + inline CURL *GetCurlEasyHandle() noexcept { return curl_resource_.easy_handle; } + +private: + std::atomic is_aborted_; // Set to 'true' when async callback is aborted + std::atomic is_finished_; // Set to 'true' when async callback is finished. + std::atomic is_cleaned_; // Set to 'true' when async callback is cleaned. + const bool is_raw_response_; // Do not split response headers from response body + const bool reuse_connection_; // Reuse connection const std::chrono::milliseconds http_conn_timeout_; // Timeout for connect. Default: 5000ms - RequestMode request_mode_; - CURL *curl_; // Local curl instance - CURLcode res_; // Curl result OR HTTP status code if successful + HttpCurlEasyResource curl_resource_; + CURLcode last_curl_result_; // Curl result OR HTTP status code if successful - opentelemetry::ext::http::client::EventHandler *callback_; + opentelemetry::ext::http::client::EventHandler *event_handle_; // Request values opentelemetry::ext::http::client::Method method_; std::string url_; const Headers &request_headers_; const opentelemetry::ext::http::client::Body &request_body_; - struct curl_slist *headers_chunk_ = nullptr; + size_t request_nwrite_; opentelemetry::ext::http::client::SessionState session_state_; // Processed response headers and body - std::vector resp_headers_; - std::vector resp_body_; + long response_code_; + std::vector response_headers_; + std::vector response_body_; std::vector raw_response_; - // Socket parameters - curl_socket_t sockfd_; - - curl_off_t nread_; - size_t sendlen_ = 0; // # bytes sent by client - size_t acklen_ = 0; // # bytes ack by server - - std::future result_; - - /** - * Helper routine to wait for data on socket - * - * @param sockfd - * @param for_recv - * @param timeout_ms - * @return - */ - static int WaitOnSocket(curl_socket_t sockfd, int for_recv, long timeout_ms) - { - struct timeval tv; - fd_set infd, outfd, errfd; - int res; - - tv.tv_sec = timeout_ms / 1000; - tv.tv_usec = (timeout_ms % 1000) * 1000; - - FD_ZERO(&infd); - FD_ZERO(&outfd); - FD_ZERO(&errfd); - - FD_SET(sockfd, &errfd); /* always check for error */ - - if (for_recv) - { - FD_SET(sockfd, &infd); - } - else - { - FD_SET(sockfd, &outfd); - } - - /* select() returns the number of signalled sockets or -1 */ - res = select((int)sockfd + 1, &infd, &outfd, &errfd, &tv); - return res; - } - - /** - * Old-school memory allocator - * - * @param contents - * @param size - * @param nmemb - * @param userp - * @return - */ - static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) + struct AsyncData { - std::vector *buf = static_cast *>(userp); - buf->insert(buf->end(), static_cast(contents), - static_cast(contents) + (size * nmemb)); - return size * nmemb; - } - - /** - * C++ STL std::vector allocator - * - * @param ptr - * @param size - * @param nmemb - * @param data - * @return - */ - static size_t WriteVectorCallback(void *ptr, - size_t size, - size_t nmemb, - std::vector *data) - { - if (data != nullptr) - { - const unsigned char *begin = (unsigned char *)(ptr); - const unsigned char *end = begin + size * nmemb; - data->insert(data->end(), begin, end); - } - return size * nmemb; - } + Session *session; // Owner Session + + std::thread::id callback_thread; + std::function callback; + std::atomic is_promise_running; + std::promise result_promise; + std::future result_future; + }; + std::unique_ptr async_data_; }; } // namespace curl } // namespace client diff --git a/ext/include/opentelemetry/ext/http/client/http_client.h b/ext/include/opentelemetry/ext/http/client/http_client.h index e939962653..34564affc3 100644 --- a/ext/include/opentelemetry/ext/http/client/http_client.h +++ b/ext/include/opentelemetry/ext/http/client/http_client.h @@ -232,6 +232,8 @@ class HttpClient virtual bool FinishAllSessions() noexcept = 0; + virtual void SetMaxSessionsPerConnection(std::size_t max_requests_per_connection) noexcept = 0; + virtual ~HttpClient() = default; }; diff --git a/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h b/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h index 405381c285..f32a075879 100644 --- a/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h +++ b/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h @@ -51,10 +51,7 @@ class Request : public opentelemetry::ext::http::client::Request void ReplaceHeader(nostd::string_view name, nostd::string_view value) noexcept override; - virtual void SetUri(nostd::string_view uri) noexcept override - { - uri_ = static_cast(uri); - } + void SetUri(nostd::string_view uri) noexcept override { uri_ = static_cast(uri); } void SetTimeoutMs(std::chrono::milliseconds timeout_ms) noexcept override { @@ -74,21 +71,16 @@ class Response : public opentelemetry::ext::http::client::Response public: Response() : status_code_(Http_Ok) {} - virtual const opentelemetry::ext::http::client::Body &GetBody() const noexcept override - { - return body_; - } + const opentelemetry::ext::http::client::Body &GetBody() const noexcept override { return body_; } - virtual bool ForEachHeader( - nostd::function_ref callable) - const noexcept override; + bool ForEachHeader(nostd::function_ref + callable) const noexcept override; - virtual bool ForEachHeader( - const nostd::string_view &name, - nostd::function_ref callable) - const noexcept override; + bool ForEachHeader(const nostd::string_view &name, + nostd::function_ref + callable) const noexcept override; - virtual opentelemetry::ext::http::client::StatusCode GetStatusCode() const noexcept override + opentelemetry::ext::http::client::StatusCode GetStatusCode() const noexcept override { return status_code_; } @@ -137,11 +129,11 @@ class Session : public opentelemetry::ext::http::client::Session (std::shared_ptr), (noexcept, override)); - virtual bool CancelSession() noexcept override; + bool CancelSession() noexcept override; - virtual bool FinishSession() noexcept override; + bool FinishSession() noexcept override; - virtual bool IsSessionActive() noexcept override { return is_session_active_; } + bool IsSessionActive() noexcept override { return is_session_active_; } void SetId(uint64_t session_id) { session_id_ = session_id; } @@ -164,27 +156,18 @@ class Session : public opentelemetry::ext::http::client::Session class HttpClient : public opentelemetry::ext::http::client::HttpClient { public: - HttpClient() { session_ = std::shared_ptr{new Session(*this)}; } + HttpClient(); std::shared_ptr CreateSession( - nostd::string_view) noexcept override - { - return session_; - } + nostd::string_view) noexcept override; - bool CancelAllSessions() noexcept override - { - session_->CancelSession(); - return true; - } + bool CancelAllSessions() noexcept override; - bool FinishAllSessions() noexcept override - { - session_->FinishSession(); - return true; - } + bool FinishAllSessions() noexcept override; + + void SetMaxSessionsPerConnection(std::size_t max_requests_per_connection) noexcept override; - void CleanupSession(uint64_t session_id) {} + void CleanupSession(uint64_t session_id); std::shared_ptr session_; }; diff --git a/ext/src/http/client/curl/BUILD b/ext/src/http/client/curl/BUILD index 33ab814b91..c0557fe99c 100644 --- a/ext/src/http/client/curl/BUILD +++ b/ext/src/http/client/curl/BUILD @@ -5,6 +5,7 @@ cc_library( srcs = [ "http_client_curl.cc", "http_client_factory_curl.cc", + "http_operation_curl.cc", ], copts = [ "-DWITH_CURL", diff --git a/ext/src/http/client/curl/CMakeLists.txt b/ext/src/http/client/curl/CMakeLists.txt index 78a81cfe3e..424f649f1a 100644 --- a/ext/src/http/client/curl/CMakeLists.txt +++ b/ext/src/http/client/curl/CMakeLists.txt @@ -1,7 +1,8 @@ find_package(CURL) if(CURL_FOUND) - add_library(opentelemetry_http_client_curl http_client_factory_curl.cc - http_client_curl.cc) + add_library( + opentelemetry_http_client_curl http_client_factory_curl.cc + http_client_curl.cc http_operation_curl.cc) set_target_properties(opentelemetry_http_client_curl PROPERTIES EXPORT_NAME http_client_curl) diff --git a/ext/src/http/client/curl/http_client_curl.cc b/ext/src/http/client/curl/http_client_curl.cc index 74ad86ea4b..20607cf582 100644 --- a/ext/src/http/client/curl/http_client_curl.cc +++ b/ext/src/http/client/curl/http_client_curl.cc @@ -3,16 +3,577 @@ #include "opentelemetry/ext/http/client/curl/http_client_curl.h" -bool opentelemetry::ext::http::client::curl::Session::CancelSession() noexcept +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace http +{ +namespace client +{ +namespace curl +{ + +HttpCurlGlobalInitializer::HttpCurlGlobalInitializer() +{ + curl_global_init(CURL_GLOBAL_ALL); +} + +HttpCurlGlobalInitializer::~HttpCurlGlobalInitializer() +{ + curl_global_cleanup(); +} + +nostd::shared_ptr HttpCurlGlobalInitializer::GetInstance() +{ + static nostd::shared_ptr shared_initializer{ + new HttpCurlGlobalInitializer()}; + return shared_initializer; +} + +void Session::SendRequest( + std::shared_ptr callback) noexcept { - curl_operation_->Abort(); + is_session_active_.store(true, std::memory_order_release); + std::string url = host_ + std::string(http_request_->uri_); + auto callback_ptr = callback.get(); + bool reuse_connection = false; + if (http_client_.GetMaxSessionsPerConnection() > 0) + { + reuse_connection = session_id_ % http_client_.GetMaxSessionsPerConnection() != 0; + } + + curl_operation_.reset(new HttpOperation(http_request_->method_, url, callback_ptr, + http_request_->headers_, http_request_->body_, false, + http_request_->timeout_ms_, reuse_connection)); + bool success = + CURLE_OK == curl_operation_->SendAsync(this, [this, callback](HttpOperation &operation) { + if (operation.WasAborted()) + { + // Manually cancelled + callback->OnEvent(opentelemetry::ext::http::client::SessionState::Cancelled, ""); + } + + if (operation.GetSessionState() == opentelemetry::ext::http::client::SessionState::Response) + { + // we have a http response + auto response = std::unique_ptr(new Response()); + response->headers_ = operation.GetResponseHeaders(); + response->body_ = operation.GetResponseBody(); + response->status_code_ = operation.GetResponseCode(); + callback->OnResponse(*response); + } + is_session_active_.store(false, std::memory_order_release); + }); + + if (success) + { + http_client_.MaybeSpawnBackgroundThread(); + } + else if (callback) + { + callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed, ""); + is_session_active_.store(false, std::memory_order_release); + } +} + +bool Session::CancelSession() noexcept +{ + if (curl_operation_) + { + curl_operation_->Abort(); + } http_client_.CleanupSession(session_id_); return true; } -bool opentelemetry::ext::http::client::curl::Session::FinishSession() noexcept +bool Session::FinishSession() noexcept { - curl_operation_->Finish(); + if (curl_operation_) + { + curl_operation_->Finish(); + } http_client_.CleanupSession(session_id_); return true; } + +void Session::FinishOperation() +{ + if (curl_operation_) + { + curl_operation_->Cleanup(); + } +} + +HttpClient::HttpClient() + : next_session_id_{0}, + max_sessions_per_connection_{8}, + scheduled_delay_milliseconds_{std::chrono::milliseconds(256)}, + curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance()) +{ + multi_handle_ = curl_multi_init(); +} + +HttpClient::~HttpClient() +{ + while (true) + { + std::unique_ptr background_thread; + { + std::lock_guard lock_guard{background_thread_m_}; + background_thread.swap(background_thread_); + } + + // Force to abort all sessions + CancelAllSessions(); + + if (!background_thread) + { + break; + } + if (background_thread->joinable()) + { + background_thread->join(); + } + } + { + std::lock_guard lock_guard{multi_handle_m_}; + curl_multi_cleanup(multi_handle_); + } +} + +std::shared_ptr HttpClient::CreateSession( + nostd::string_view url) noexcept +{ + auto parsedUrl = common::UrlParser(std::string(url)); + if (!parsedUrl.success_) + { + return std::make_shared(*this); + } + auto session = + std::make_shared(*this, parsedUrl.scheme_, parsedUrl.host_, parsedUrl.port_); + auto session_id = ++next_session_id_; + session->SetId(session_id); + + std::lock_guard lock_guard{sessions_m_}; + sessions_.insert({session_id, session}); + + // FIXME: Session may leak if it do not SendRequest + return session; +} + +bool HttpClient::CancelAllSessions() noexcept +{ + // CancelSession may change sessions_, we can not change a container while iterating it. + while (true) + { + std::unordered_map> sessions; + { + std::lock_guard lock_guard{sessions_m_}; + sessions.swap(sessions_); + } + + if (sessions.empty()) + { + break; + } + + for (auto &session : sessions) + { + session.second->CancelSession(); + } + } + return true; +} + +bool HttpClient::FinishAllSessions() noexcept +{ + // FinishSession may change sessions_, we can not change a container while iterating it. + while (true) + { + std::unordered_map> sessions; + { + std::lock_guard lock_guard{sessions_m_}; + sessions.swap(sessions_); + } + + if (sessions.empty()) + { + break; + } + + for (auto &session : sessions) + { + session.second->FinishSession(); + } + } + return true; +} + +void HttpClient::SetMaxSessionsPerConnection(std::size_t max_requests_per_connection) noexcept +{ + max_sessions_per_connection_ = max_requests_per_connection; +} + +void HttpClient::CleanupSession(uint64_t session_id) +{ + std::shared_ptr session; + { + std::lock_guard lock_guard{sessions_m_}; + auto it = sessions_.find(session_id); + if (it != sessions_.end()) + { + session = it->second; + sessions_.erase(it); + } + } + + { + std::lock_guard lock_guard{session_ids_m_}; + pending_to_add_session_ids_.erase(session_id); + + if (session) + { + if (pending_to_remove_session_handles_.end() != + pending_to_remove_session_handles_.find(session_id)) + { + pending_to_remove_sessions_.emplace_back(std::move(session)); + } + else if (session->IsSessionActive() && session->GetOperation()) + { + session->FinishOperation(); + } + } + } +} + +void HttpClient::MaybeSpawnBackgroundThread() +{ + std::lock_guard lock_guard{background_thread_m_}; + if (background_thread_) + { + return; + } + + background_thread_.reset(new std::thread( + [](HttpClient *self) { + int still_running = 1; + while (true) + { + CURLMsg *msg; + int queued; + CURLMcode mc = curl_multi_perform(self->multi_handle_, &still_running); + // According to https://curl.se/libcurl/c/curl_multi_perform.html, when mc is not OK, we + // can not curl_multi_perform it again + if (mc != CURLM_OK) + { + self->resetMultiHandle(); + } + else if (still_running) + { + // curl_multi_poll is added from libcurl 7.66.0, before 7.68.0, we can only wait util + // timeout to do the rest jobs +#if LIBCURL_VERSION_NUM >= 0x074200 + /* wait for activity, timeout or "nothing" */ + mc = curl_multi_poll(self->multi_handle_, nullptr, 0, + static_cast(self->scheduled_delay_milliseconds_.count()), + nullptr); +#else + mc = curl_multi_wait(self->multi_handle_, nullptr, 0, + static_cast(self->scheduled_delay_milliseconds_.count()), + nullptr); +#endif + } + + do + { + msg = curl_multi_info_read(self->multi_handle_, &queued); + if (msg == nullptr) + { + break; + } + + if (msg->msg == CURLMSG_DONE) + { + CURL *easy_handle = msg->easy_handle; + CURLcode result = msg->data.result; + Session *session = nullptr; + curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &session); + // If it's already moved into pending_to_remove_session_handles_, we just ingore this + // message. + if (nullptr != session && session->GetOperation()) + { + // Session can not be destroyed when calling PerformCurlMessage + auto hold_session = session->shared_from_this(); + session->GetOperation()->PerformCurlMessage(result); + } + } + } while (true); + + // Abort all pending easy handles + if (self->doAbortSessions()) + { + still_running = 1; + } + + // Remove all pending easy handles + if (self->doRemoveSessions()) + { + still_running = 1; + } + + // Add all pending easy handles + if (self->doAddSessions()) + { + still_running = 1; + } + + if (still_running == 0) + { + std::lock_guard lock_guard{self->background_thread_m_}; + // Double check, make sure no more pending sessions after locking background thread + // management + + // Abort all pending easy handles + if (self->doAbortSessions()) + { + still_running = 1; + } + + // Remove all pending easy handles + if (self->doRemoveSessions()) + { + still_running = 1; + } + + // Add all pending easy handles + if (self->doAddSessions()) + { + still_running = 1; + } + if (still_running == 0) + { + if (self->background_thread_) + { + self->background_thread_->detach(); + self->background_thread_.reset(); + } + break; + } + } + } + }, + this)); +} + +void HttpClient::ScheduleAddSession(uint64_t session_id) +{ + { + std::lock_guard lock_guard{session_ids_m_}; + pending_to_add_session_ids_.insert(session_id); + pending_to_remove_session_handles_.erase(session_id); + pending_to_abort_session_ids_.erase(session_id); + } + + wakeupBackgroundThread(); +} + +void HttpClient::ScheduleAbortSession(uint64_t session_id) +{ + { + std::lock_guard lock_guard{session_ids_m_}; + pending_to_abort_session_ids_.insert(session_id); + pending_to_add_session_ids_.erase(session_id); + } + + wakeupBackgroundThread(); +} + +void HttpClient::ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource &&resource) +{ + { + std::lock_guard lock_guard{session_ids_m_}; + pending_to_add_session_ids_.erase(session_id); + pending_to_remove_session_handles_[session_id] = std::move(resource); + } + + wakeupBackgroundThread(); +} + +void HttpClient::wakeupBackgroundThread() +{ +// Before libcurl 7.68.0, we can only wait for timeout and do the rest jobs +// See https://curl.se/libcurl/c/curl_multi_wakeup.html +#if LIBCURL_VERSION_NUM >= 0x074400 + std::lock_guard lock_guard{multi_handle_m_}; + if (nullptr != multi_handle_) + { + curl_multi_wakeup(multi_handle_); + } +#endif +} + +bool HttpClient::doAddSessions() +{ + std::unordered_set pending_to_add_session_ids; + { + std::lock_guard session_id_lock_guard{session_ids_m_}; + pending_to_add_session_ids_.swap(pending_to_add_session_ids); + } + + bool has_data = false; + + std::lock_guard lock_guard{sessions_m_}; + for (auto &session_id : pending_to_add_session_ids) + { + auto session = sessions_.find(session_id); + if (session == sessions_.end()) + { + continue; + } + + if (!session->second->GetOperation()) + { + continue; + } + + CURL *easy_handle = session->second->GetOperation()->GetCurlEasyHandle(); + if (nullptr == easy_handle) + { + continue; + } + + curl_multi_add_handle(multi_handle_, easy_handle); + has_data = true; + } + + return has_data; +} + +bool HttpClient::doAbortSessions() +{ + std::list> abort_sessions; + std::unordered_set pending_to_abort_session_ids; + { + std::lock_guard session_id_lock_guard{session_ids_m_}; + pending_to_abort_session_ids_.swap(pending_to_abort_session_ids); + } + + { + std::lock_guard lock_guard{sessions_m_}; + for (auto &session_id : pending_to_abort_session_ids) + { + auto session = sessions_.find(session_id); + if (session == sessions_.end()) + { + continue; + } + + abort_sessions.push_back(session->second); + } + } + + bool has_data = false; + for (auto session : abort_sessions) + { + if (session->GetOperation()) + { + session->FinishOperation(); + has_data = true; + } + } + return has_data; +} + +bool HttpClient::doRemoveSessions() +{ + bool has_data = false; + bool should_continue; + do + { + std::unordered_map pending_to_remove_session_handles; + std::list> pending_to_remove_sessions; + { + std::lock_guard session_id_lock_guard{session_ids_m_}; + pending_to_remove_session_handles_.swap(pending_to_remove_session_handles); + pending_to_remove_sessions_.swap(pending_to_remove_sessions); + + // If user callback do not call CancelSession or FinishSession, We still need to remove it + // from sessions_ + std::lock_guard session_lock_guard{sessions_m_}; + for (auto &removing_handle : pending_to_remove_session_handles) + { + auto session = sessions_.find(removing_handle.first); + if (session != sessions_.end()) + { + pending_to_remove_sessions.emplace_back(std::move(session->second)); + sessions_.erase(session); + } + } + } + + for (auto &removing_handle : pending_to_remove_session_handles) + { + if (nullptr != removing_handle.second.headers_chunk) + { + curl_slist_free_all(removing_handle.second.headers_chunk); + } + + curl_multi_remove_handle(multi_handle_, removing_handle.second.easy_handle); + curl_easy_cleanup(removing_handle.second.easy_handle); + } + + for (auto &removing_session : pending_to_remove_sessions) + { + // This operation may add more pending_to_remove_session_handles + removing_session->FinishOperation(); + } + + should_continue = + !pending_to_remove_session_handles.empty() || !pending_to_remove_sessions.empty(); + if (should_continue) + { + has_data = true; + } + } while (should_continue); + + return has_data; +} + +void HttpClient::resetMultiHandle() +{ + std::list> sessions; + std::lock_guard session_lock_guard{sessions_m_}; + { + std::lock_guard session_id_lock_guard{session_ids_m_}; + for (auto &session : sessions_) + { + if (pending_to_add_session_ids_.end() == pending_to_add_session_ids_.find(session.first)) + { + sessions.push_back(session.second); + } + } + } + + for (auto &session : sessions) + { + session->CancelSession(); + session->FinishOperation(); + } + + doRemoveSessions(); + + // We will modify the multi_handle_, so we need to lock it + std::lock_guard lock_guard{multi_handle_m_}; + curl_multi_cleanup(multi_handle_); + + // Create a another multi handle to continue pending sessions + multi_handle_ = curl_multi_init(); +} + +} // namespace curl +} // namespace client +} // namespace http +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/ext/src/http/client/curl/http_operation_curl.cc b/ext/src/http/client/curl/http_operation_curl.cc new file mode 100644 index 0000000000..cb32e5f916 --- /dev/null +++ b/ext/src/http/client/curl/http_operation_curl.cc @@ -0,0 +1,684 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext/http/client/curl/http_operation_curl.h" + +#include "opentelemetry/ext/http/client/curl/http_client_curl.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace http +{ +namespace client +{ +namespace curl +{ + +size_t HttpOperation::WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) +{ + HttpOperation *self = reinterpret_cast(userp); + if (nullptr == self) + { + return 0; + } + + self->raw_response_.insert(self->raw_response_.end(), static_cast(contents), + static_cast(contents) + (size * nmemb)); + + if (self->WasAborted()) + { + return 0; + } + + if (self->GetSessionState() == opentelemetry::ext::http::client::SessionState::Connecting) + { + self->DispatchEvent(opentelemetry::ext::http::client::SessionState::Connected); + } + + if (self->GetSessionState() == opentelemetry::ext::http::client::SessionState::Connected) + { + self->DispatchEvent(opentelemetry::ext::http::client::SessionState::Sending); + } + + return size * nmemb; +} + +size_t HttpOperation::WriteVectorHeaderCallback(void *ptr, size_t size, size_t nmemb, void *userp) +{ + HttpOperation *self = reinterpret_cast(userp); + if (nullptr == self) + { + return 0; + } + + const unsigned char *begin = (unsigned char *)(ptr); + const unsigned char *end = begin + size * nmemb; + self->response_headers_.insert(self->response_headers_.end(), begin, end); + + if (self->WasAborted()) + { + return 0; + } + + if (self->GetSessionState() == opentelemetry::ext::http::client::SessionState::Connecting) + { + self->DispatchEvent(opentelemetry::ext::http::client::SessionState::Connected); + } + + if (self->GetSessionState() == opentelemetry::ext::http::client::SessionState::Connected) + { + self->DispatchEvent(opentelemetry::ext::http::client::SessionState::Sending); + } + + return size * nmemb; +} + +size_t HttpOperation::WriteVectorBodyCallback(void *ptr, size_t size, size_t nmemb, void *userp) +{ + HttpOperation *self = reinterpret_cast(userp); + if (nullptr == self) + { + return 0; + } + + const unsigned char *begin = (unsigned char *)(ptr); + const unsigned char *end = begin + size * nmemb; + self->response_body_.insert(self->response_body_.end(), begin, end); + + if (self->WasAborted()) + { + return 0; + } + + if (self->GetSessionState() == opentelemetry::ext::http::client::SessionState::Connecting) + { + self->DispatchEvent(opentelemetry::ext::http::client::SessionState::Connected); + } + + if (self->GetSessionState() == opentelemetry::ext::http::client::SessionState::Connected) + { + self->DispatchEvent(opentelemetry::ext::http::client::SessionState::Sending); + } + + return size * nmemb; +} + +size_t HttpOperation::ReadMemoryCallback(char *buffer, size_t size, size_t nitems, void *userp) +{ + HttpOperation *self = reinterpret_cast(userp); + if (nullptr == self) + { + return 0; + } + + if (self->WasAborted()) + { + return CURL_READFUNC_ABORT; + } + + if (self->GetSessionState() == opentelemetry::ext::http::client::SessionState::Connecting) + { + self->DispatchEvent(opentelemetry::ext::http::client::SessionState::Connected); + } + + if (self->GetSessionState() == opentelemetry::ext::http::client::SessionState::Connected) + { + self->DispatchEvent(opentelemetry::ext::http::client::SessionState::Sending); + } + + // EOF + if (self->request_nwrite_ >= self->request_body_.size()) + { + return 0; + } + + size_t nwrite = size * nitems; + if (nwrite > self->request_body_.size() - self->request_nwrite_) + { + nwrite = self->request_body_.size() - self->request_nwrite_; + } + + memcpy(buffer, &self->request_body_[self->request_nwrite_], nwrite); + self->request_nwrite_ += nwrite; + return nwrite; +} + +#if LIBCURL_VERSION_NUM >= 0x075000 +int HttpOperation::PreRequestCallback(void *clientp, char *, char *, int, int) +{ + HttpOperation *self = reinterpret_cast(clientp); + if (nullptr == self) + { + return CURL_PREREQFUNC_ABORT; + } + + if (self->GetSessionState() == opentelemetry::ext::http::client::SessionState::Connecting) + { + self->DispatchEvent(opentelemetry::ext::http::client::SessionState::Connected); + } + + if (self->WasAborted()) + { + return CURL_PREREQFUNC_ABORT; + } + + return CURL_PREREQFUNC_OK; +} +#endif + +#if LIBCURL_VERSION_NUM >= 0x072000 +int HttpOperation::OnProgressCallback(void *clientp, + curl_off_t dltotal, + curl_off_t dlnow, + curl_off_t ultotal, + curl_off_t ulnow) +{ + HttpOperation *self = reinterpret_cast(clientp); + if (nullptr == self) + { + return -1; + } + + if (self->WasAborted()) + { + return -1; + } + + // CURL_PROGRESSFUNC_CONTINUE is added in 7.68.0 +# if defined(CURL_PROGRESSFUNC_CONTINUE) + return CURL_PROGRESSFUNC_CONTINUE; +# else + return 0; +# endif +} +#else +int HttpOperation::OnProgressCallback(void *clientp, + double dltotal, + double dlnow, + double ultotal, + double ulnow) +{ + HttpOperation *self = reinterpret_cast(clientp); + if (nullptr == self) + { + return -1; + } + + if (self->WasAborted()) + { + return -1; + } + + return 0; +} +#endif + +void HttpOperation::DispatchEvent(opentelemetry::ext::http::client::SessionState type, + std::string reason) +{ + if (event_handle_ != nullptr) + { + event_handle_->OnEvent(type, reason); + } + + session_state_ = type; +} + +HttpOperation::HttpOperation(opentelemetry::ext::http::client::Method method, + std::string url, + opentelemetry::ext::http::client::EventHandler *event_handle, + // Default empty headers and empty request body + const opentelemetry::ext::http::client::Headers &request_headers, + const opentelemetry::ext::http::client::Body &request_body, + // Default connectivity and response size options + bool is_raw_response, + std::chrono::milliseconds http_conn_timeout, + bool reuse_connection) + : is_aborted_(false), + is_finished_(false), + is_cleaned_(false), + // Optional connection params + is_raw_response_(is_raw_response), + reuse_connection_(reuse_connection), + http_conn_timeout_(http_conn_timeout), + // Result + last_curl_result_(CURLE_OK), + event_handle_(event_handle), + method_(method), + url_(url), + // Local vars + request_headers_(request_headers), + request_body_(request_body), + request_nwrite_(0), + session_state_(opentelemetry::ext::http::client::SessionState::Created), + response_code_(0) +{ + /* get a curl handle */ + curl_resource_.easy_handle = curl_easy_init(); + if (!curl_resource_.easy_handle) + { + last_curl_result_ = CURLE_FAILED_INIT; + DispatchEvent(opentelemetry::ext::http::client::SessionState::CreateFailed, + curl_easy_strerror(last_curl_result_)); + return; + } + + // Specify our custom headers + if (!this->request_headers_.empty()) + { + for (auto &kv : this->request_headers_) + { + std::string header = std::string(kv.first); + header += ": "; + header += std::string(kv.second); + curl_resource_.headers_chunk = + curl_slist_append(curl_resource_.headers_chunk, header.c_str()); + } + } + + DispatchEvent(opentelemetry::ext::http::client::SessionState::Created); +} + +HttpOperation::~HttpOperation() +{ + // Given the request has not been aborted we should wait for completion here + // This guarantees the lifetime of this request. + switch (GetSessionState()) + { + case opentelemetry::ext::http::client::SessionState::Connecting: + case opentelemetry::ext::http::client::SessionState::Connected: + case opentelemetry::ext::http::client::SessionState::Sending: { + if (async_data_ && async_data_->result_future.valid()) + { + if (async_data_->callback_thread != std::this_thread::get_id()) + { + async_data_->result_future.wait(); + last_curl_result_ = async_data_->result_future.get(); + } + } + break; + } + default: + break; + } + + Cleanup(); +} + +void HttpOperation::Finish() +{ + if (is_finished_.exchange(true, std::memory_order_acq_rel)) + { + return; + } + + if (async_data_ && async_data_->result_future.valid()) + { + // We should not wait in callback from Cleanup() + if (async_data_->callback_thread != std::this_thread::get_id()) + { + async_data_->result_future.wait(); + last_curl_result_ = async_data_->result_future.get(); + } + } +} + +void HttpOperation::Cleanup() +{ + if (is_cleaned_.exchange(true, std::memory_order_acq_rel)) + { + return; + } + + switch (GetSessionState()) + { + case opentelemetry::ext::http::client::SessionState::Created: + case opentelemetry::ext::http::client::SessionState::Connecting: + case opentelemetry::ext::http::client::SessionState::Connected: + case opentelemetry::ext::http::client::SessionState::Sending: { + DispatchEvent(opentelemetry::ext::http::client::SessionState::Cancelled, + curl_easy_strerror(last_curl_result_)); + break; + } + default: + break; + } + + std::function callback; + + // Only cleanup async once even in recursive calls + if (async_data_) + { + // Just reset and move easy_handle to owner if in async mode + if (async_data_->session != nullptr) + { + auto session = async_data_->session; + async_data_->session = nullptr; + + if (curl_resource_.easy_handle != nullptr) + { + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_PRIVATE, NULL); + curl_easy_reset(curl_resource_.easy_handle); + } + session->GetHttpClient().ScheduleRemoveSession(session->GetSessionId(), + std::move(curl_resource_)); + } + + callback.swap(async_data_->callback); + if (callback) + { + async_data_->callback_thread = std::this_thread::get_id(); + callback(*this); + async_data_->callback_thread = std::thread::id(); + } + + // Set value to promise to continue Finish() + if (true == async_data_->is_promise_running.exchange(false, std::memory_order_acq_rel)) + { + async_data_->result_promise.set_value(last_curl_result_); + } + + return; + } + + // Sync mode + if (curl_resource_.easy_handle != nullptr) + { + curl_easy_cleanup(curl_resource_.easy_handle); + curl_resource_.easy_handle = nullptr; + } + + if (curl_resource_.headers_chunk != nullptr) + { + curl_slist_free_all(curl_resource_.headers_chunk); + curl_resource_.headers_chunk = nullptr; + } +} + +CURLcode HttpOperation::Setup() +{ + if (!curl_resource_.easy_handle) + { + return CURLE_FAILED_INIT; + } + + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_VERBOSE, 0); + + // Specify target URL + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_URL, url_.c_str()); + + // TODO: support ssl cert verification for https request + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_SSL_VERIFYPEER, 0); // 1L + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_SSL_VERIFYHOST, 0); // 2L + + if (curl_resource_.headers_chunk != nullptr) + { + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_HTTPHEADER, curl_resource_.headers_chunk); + } + + // TODO: control local port to use + // curl_easy_setopt(curl, CURLOPT_LOCALPORT, dcf_port); + + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_TIMEOUT_MS, http_conn_timeout_.count()); + + // abort if slower than 4kb/sec during 30 seconds + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_LOW_SPEED_TIME, 30L); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_LOW_SPEED_LIMIT, 4096); + if (reuse_connection_) + { + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_FRESH_CONNECT, 0L); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_FORBID_REUSE, 0L); + } + else + { + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_FRESH_CONNECT, 1L); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_FORBID_REUSE, 1L); + } + + if (is_raw_response_) + { + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_HEADER, true); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_WRITEFUNCTION, + (void *)&HttpOperation::WriteMemoryCallback); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_WRITEDATA, (void *)this); + } + else + { + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_WRITEFUNCTION, + (void *)&HttpOperation::WriteVectorBodyCallback); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_WRITEDATA, (void *)this); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_HEADERFUNCTION, + (void *)&HttpOperation::WriteVectorHeaderCallback); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_HEADERDATA, (void *)this); + } + + // TODO: only two methods supported for now - POST and GET + if (method_ == opentelemetry::ext::http::client::Method::Post) + { + // Request buffer + const curl_off_t req_size = static_cast(request_body_.size()); + // POST + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_POST, 1L); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_POSTFIELDS, NULL); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_POSTFIELDSIZE_LARGE, req_size); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_READFUNCTION, + (void *)&HttpOperation::ReadMemoryCallback); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_READDATA, (void *)this); + } + else if (method_ == opentelemetry::ext::http::client::Method::Get) + { + // GET + } + else + { + return CURLE_UNSUPPORTED_PROTOCOL; + } + +#if LIBCURL_VERSION_NUM >= 0x072000 + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_XFERINFOFUNCTION, + (void *)&HttpOperation::OnProgressCallback); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_XFERINFODATA, (void *)this); +#else + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_PROGRESSFUNCTION, + (void *)&HttpOperation::OnProgressCallback); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_PROGRESSDATA, (void *)this); +#endif + +#if LIBCURL_VERSION_NUM >= 0x075000 + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_PREREQFUNCTION, + (void *)&HttpOperation::PreRequestCallback); + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_PREREQDATA, (void *)this); +#endif + + return CURLE_OK; +} + +CURLcode HttpOperation::Send() +{ + // If it is async sending, just return error + if (async_data_ && async_data_->is_promise_running.load(std::memory_order_acquire)) + { + return CURLE_FAILED_INIT; + } + + ReleaseResponse(); + + last_curl_result_ = Setup(); + if (last_curl_result_ != CURLE_OK) + { + DispatchEvent(opentelemetry::ext::http::client::SessionState::ConnectFailed, + curl_easy_strerror(last_curl_result_)); + return last_curl_result_; + } + + // Perform initial connect, handling the timeout if needed + // We can not use CURLOPT_CONNECT_ONLY because it will disable the reuse of connections. + DispatchEvent(opentelemetry::ext::http::client::SessionState::Connecting); + is_finished_.store(false, std::memory_order_release); + is_aborted_.store(false, std::memory_order_release); + is_cleaned_.store(false, std::memory_order_release); + + CURLcode code = curl_easy_perform(curl_resource_.easy_handle); + PerformCurlMessage(code); + if (CURLE_OK != code) + { + return code; + } + + return code; +} + +CURLcode HttpOperation::SendAsync(Session *session, std::function callback) +{ + if (nullptr == session) + { + return CURLE_FAILED_INIT; + } + + if (async_data_ && async_data_->is_promise_running.load(std::memory_order_acquire)) + { + return CURLE_FAILED_INIT; + } + else + { + async_data_.reset(new AsyncData()); + async_data_->is_promise_running.store(false, std::memory_order_release); + async_data_->session = nullptr; + } + + ReleaseResponse(); + + CURLcode code = Setup(); + last_curl_result_ = code; + if (code != CURLE_OK) + { + return code; + } + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_PRIVATE, session); + + DispatchEvent(opentelemetry::ext::http::client::SessionState::Connecting); + is_finished_.store(false, std::memory_order_release); + is_aborted_.store(false, std::memory_order_release); + is_cleaned_.store(false, std::memory_order_release); + + async_data_->session = session; + if (false == async_data_->is_promise_running.exchange(true, std::memory_order_acq_rel)) + { + async_data_->result_promise = std::promise(); + async_data_->result_future = async_data_->result_promise.get_future(); + } + async_data_->callback = std::move(callback); + + session->GetHttpClient().ScheduleAddSession(session->GetSessionId()); + return code; +} + +Headers HttpOperation::GetResponseHeaders() +{ + Headers result; + if (response_headers_.size() == 0) + return result; + + std::stringstream ss; + std::string headers((const char *)&response_headers_[0], response_headers_.size()); + ss.str(headers); + + std::string header; + while (std::getline(ss, header, '\n')) + { + // TODO - Regex below crashes with out-of-memory on CI docker container, so + // switching to string comparison. Need to debug and revert back. + + /*std::smatch match; + std::regex http_headers_regex(http_header_regexp); + if (std::regex_search(header, match, http_headers_regex)) + result.insert(std::pair( + static_cast(match[1]), static_cast(match[2]))); + */ + size_t pos = header.find(": "); + if (pos != std::string::npos) + result.insert( + std::pair(header.substr(0, pos), header.substr(pos + 2))); + } + return result; +} + +void HttpOperation::ReleaseResponse() +{ + response_headers_.clear(); + response_body_.clear(); + raw_response_.clear(); +} + +void HttpOperation::Abort() +{ + is_aborted_.store(true, std::memory_order_release); + if (curl_resource_.easy_handle != nullptr) + { + // Enable progress callback to abort from polling thread + curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_NOPROGRESS, 0L); + if (async_data_ && nullptr != async_data_->session) + { + async_data_->session->GetHttpClient().ScheduleAbortSession( + async_data_->session->GetSessionId()); + } + } +} + +void HttpOperation::PerformCurlMessage(CURLcode code) +{ + last_curl_result_ = code; + if (code != CURLE_OK) + { + switch (GetSessionState()) + { + case opentelemetry::ext::http::client::SessionState::Connecting: { + DispatchEvent(opentelemetry::ext::http::client::SessionState::ConnectFailed, + curl_easy_strerror(code)); // couldn't connect - stage 1 + break; + } + case opentelemetry::ext::http::client::SessionState::Connected: + case opentelemetry::ext::http::client::SessionState::Sending: { + if (GetSessionState() == opentelemetry::ext::http::client::SessionState::Connected) + { + DispatchEvent(opentelemetry::ext::http::client::SessionState::Sending); + } + + DispatchEvent(opentelemetry::ext::http::client::SessionState::SendFailed, + curl_easy_strerror(code)); + } + default: + break; + } + } + else if (curl_resource_.easy_handle != nullptr) + { + curl_easy_getinfo(curl_resource_.easy_handle, CURLINFO_RESPONSE_CODE, &response_code_); + } + + // Transform state + if (GetSessionState() == opentelemetry::ext::http::client::SessionState::Connecting) + { + DispatchEvent(opentelemetry::ext::http::client::SessionState::Connected); + } + + if (GetSessionState() == opentelemetry::ext::http::client::SessionState::Connected) + { + DispatchEvent(opentelemetry::ext::http::client::SessionState::Sending); + } + + if (GetSessionState() == opentelemetry::ext::http::client::SessionState::Sending) + { + DispatchEvent(opentelemetry::ext::http::client::SessionState::Response); + } + + // Cleanup and unbind easy handle from multi handle, and finish callback + Cleanup(); +} + +} // namespace curl +} // namespace client +} // namespace http +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/ext/src/http/client/nosend/http_client_nosend.cc b/ext/src/http/client/nosend/http_client_nosend.cc index c2b1c6acf9..021af33095 100644 --- a/ext/src/http/client/nosend/http_client_nosend.cc +++ b/ext/src/http/client/nosend/http_client_nosend.cc @@ -63,6 +63,33 @@ bool Session::FinishSession() noexcept return true; } +HttpClient::HttpClient() +{ + session_ = std::shared_ptr{new Session(*this)}; +} + +std::shared_ptr HttpClient::CreateSession( + nostd::string_view) noexcept +{ + return session_; +} + +bool HttpClient::CancelAllSessions() noexcept +{ + session_->CancelSession(); + return true; +} + +bool HttpClient::FinishAllSessions() noexcept +{ + session_->FinishSession(); + return true; +} + +void HttpClient::SetMaxSessionsPerConnection(std::size_t max_requests_per_connection) noexcept {} + +void HttpClient::CleanupSession(uint64_t session_id) {} + } // namespace nosend } // namespace client } // namespace http diff --git a/ext/test/http/curl_http_test.cc b/ext/test/http/curl_http_test.cc index 7085a1da33..0bb5aa09a8 100644 --- a/ext/test/http/curl_http_test.cc +++ b/ext/test/http/curl_http_test.cc @@ -25,12 +25,27 @@ namespace nostd = opentelemetry::nostd; class CustomEventHandler : public http_client::EventHandler { public: - virtual void OnResponse(http_client::Response &response) noexcept override{}; + virtual void OnResponse(http_client::Response &response) noexcept override + { + got_response_ = true; + }; virtual void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override - {} + { + switch (state) + { + case http_client::SessionState::ConnectFailed: + case http_client::SessionState::SendFailed: { + is_called_ = true; + break; + } + default: + break; + } + } virtual void OnConnecting(const http_client::SSLCertificate &) noexcept {} virtual ~CustomEventHandler() = default; bool is_called_ = false; + bool got_response_ = false; }; class GetEventHandler : public CustomEventHandler @@ -39,7 +54,8 @@ class GetEventHandler : public CustomEventHandler { ASSERT_EQ(200, response.GetStatusCode()); ASSERT_EQ(response.GetBody().size(), 0); - is_called_ = true; + is_called_ = true; + got_response_ = true; }; }; @@ -50,8 +66,32 @@ class PostEventHandler : public CustomEventHandler ASSERT_EQ(200, response.GetStatusCode()); std::string body(response.GetBody().begin(), response.GetBody().end()); ASSERT_EQ(body, "{'k1':'v1', 'k2':'v2', 'k3':'v3'}"); - is_called_ = true; + is_called_ = true; + got_response_ = true; + } +}; + +class FinishInCallbackHandler : public CustomEventHandler +{ +public: + FinishInCallbackHandler(std::shared_ptr session) : session_(session) {} + + void OnResponse(http_client::Response &response) noexcept override + { + ASSERT_EQ(200, response.GetStatusCode()); + ASSERT_EQ(response.GetBody().size(), 0); + is_called_ = true; + got_response_ = true; + + if (session_) + { + session_->FinishSession(); + session_.reset(); + } } + +private: + std::shared_ptr session_; }; class BasicCurlHttpTests : public ::testing::Test, public HTTP_SERVER_NS::HttpRequestCallback @@ -108,7 +148,7 @@ class BasicCurlHttpTests : public ::testing::Test, public HTTP_SERVER_NS::HttpRe response.headers["Content-Type"] = "text/plain"; response_status = 200; } - if (request.uri == "/post/") + else if (request.uri == "/post/") { std::unique_lock lk(mtx_requests); received_requests_.push_back(request); @@ -125,8 +165,10 @@ class BasicCurlHttpTests : public ::testing::Test, public HTTP_SERVER_NS::HttpRe bool waitForRequests(unsigned timeOutSec, unsigned expected_count = 1) { std::unique_lock lk(mtx_requests); - if (cv_got_events.wait_for(lk, std::chrono::milliseconds(1000 * timeOutSec), - [&] { return received_requests_.size() >= expected_count; })) + if (cv_got_events.wait_for(lk, std::chrono::milliseconds(1000 * timeOutSec), [&] { + // + return received_requests_.size() >= expected_count; + })) { return true; } @@ -201,6 +243,7 @@ TEST_F(BasicCurlHttpTests, SendGetRequest) ASSERT_TRUE(waitForRequests(30, 1)); session->FinishSession(); ASSERT_TRUE(handler->is_called_); + ASSERT_TRUE(handler->got_response_); } TEST_F(BasicCurlHttpTests, SendPostRequest) @@ -223,6 +266,7 @@ TEST_F(BasicCurlHttpTests, SendPostRequest) ASSERT_TRUE(waitForRequests(30, 1)); session->FinishSession(); ASSERT_TRUE(handler->is_called_); + ASSERT_TRUE(handler->got_response_); session_manager->CancelAllSessions(); session_manager->FinishAllSessions(); @@ -240,7 +284,8 @@ TEST_F(BasicCurlHttpTests, RequestTimeout) auto handler = std::make_shared(); session->SendRequest(handler); session->FinishSession(); - ASSERT_FALSE(handler->is_called_); + ASSERT_TRUE(handler->is_called_); + ASSERT_FALSE(handler->got_response_); } TEST_F(BasicCurlHttpTests, CurlHttpOperations) @@ -253,16 +298,16 @@ TEST_F(BasicCurlHttpTests, CurlHttpOperations) http_client::Headers headers = { {"name1", "value1_1"}, {"name1", "value1_2"}, {"name2", "value3"}, {"name3", "value3"}}; - curl::HttpOperation http_operations1(http_client::Method::Head, "/get", handler, - curl::RequestMode::Async, headers, body, true); + curl::HttpOperation http_operations1(http_client::Method::Head, "/get", handler, headers, body, + true); http_operations1.Send(); - curl::HttpOperation http_operations2(http_client::Method::Get, "/get", handler, - curl::RequestMode::Async, headers, body, true); + curl::HttpOperation http_operations2(http_client::Method::Get, "/get", handler, headers, body, + true); http_operations2.Send(); - curl::HttpOperation http_operations3(http_client::Method::Get, "/get", handler, - curl::RequestMode::Async, headers, body, false); + curl::HttpOperation http_operations3(http_client::Method::Get, "/get", handler, headers, body, + false); http_operations3.Send(); delete handler; } @@ -319,3 +364,154 @@ TEST_F(BasicCurlHttpTests, GetBaseUri) ASSERT_EQ(std::static_pointer_cast(session)->GetBaseUri(), "http://127.0.0.1:31339/"); } + +TEST_F(BasicCurlHttpTests, SendGetRequestAsync) +{ + curl::HttpClient http_client; + + for (int round = 0; round < 2; ++round) + { + received_requests_.clear(); + static constexpr const unsigned batch_count = 5; + std::shared_ptr sessions[batch_count]; + std::shared_ptr handlers[batch_count]; + for (unsigned i = 0; i < batch_count; ++i) + { + sessions[i] = http_client.CreateSession("http://127.0.0.1:19000/get/"); + auto request = sessions[i]->CreateRequest(); + request->SetMethod(http_client::Method::Get); + request->SetUri("get/"); + + handlers[i] = std::make_shared(); + + // Lock mtx_requests to prevent response, we will check IsSessionActive() in the end + std::unique_lock lock_requests(mtx_requests); + sessions[i]->SendRequest(handlers[i]); + ASSERT_TRUE(sessions[i]->IsSessionActive()); + } + + ASSERT_TRUE(waitForRequests(30, batch_count)); + + for (unsigned i = 0; i < batch_count; ++i) + { + sessions[i]->FinishSession(); + ASSERT_FALSE(sessions[i]->IsSessionActive()); + + ASSERT_TRUE(handlers[i]->is_called_); + ASSERT_TRUE(handlers[i]->got_response_); + } + + http_client.WaitBackgroundThreadExit(); + } +} + +TEST_F(BasicCurlHttpTests, SendGetRequestAsyncTimeout) +{ + received_requests_.clear(); + curl::HttpClient http_client; + + static constexpr const unsigned batch_count = 5; + std::shared_ptr sessions[batch_count]; + std::shared_ptr handlers[batch_count]; + for (unsigned i = 0; i < batch_count; ++i) + { + sessions[i] = http_client.CreateSession("http://222.222.222.200:19000/get/"); + auto request = sessions[i]->CreateRequest(); + request->SetMethod(http_client::Method::Get); + request->SetUri("get/"); + request->SetTimeoutMs(std::chrono::milliseconds(256)); + + handlers[i] = std::make_shared(); + + // Lock mtx_requests to prevent response, we will check IsSessionActive() in the end + std::unique_lock lock_requests(mtx_requests); + sessions[i]->SendRequest(handlers[i]); + ASSERT_TRUE(sessions[i]->IsSessionActive()); + } + + for (unsigned i = 0; i < batch_count; ++i) + { + sessions[i]->FinishSession(); + ASSERT_FALSE(sessions[i]->IsSessionActive()); + + ASSERT_TRUE(handlers[i]->is_called_); + ASSERT_FALSE(handlers[i]->got_response_); + } +} + +TEST_F(BasicCurlHttpTests, SendPostRequestAsync) +{ + curl::HttpClient http_client; + + for (int round = 0; round < 2; ++round) + { + received_requests_.clear(); + auto handler = std::make_shared(); + + static constexpr const unsigned batch_count = 5; + std::shared_ptr sessions[batch_count]; + for (auto &session : sessions) + { + session = http_client.CreateSession("http://127.0.0.1:19000/post/"); + auto request = session->CreateRequest(); + request->SetMethod(http_client::Method::Post); + request->SetUri("post/"); + + // Lock mtx_requests to prevent response, we will check IsSessionActive() in the end + std::unique_lock lock_requests(mtx_requests); + session->SendRequest(handler); + ASSERT_TRUE(session->IsSessionActive()); + } + + ASSERT_TRUE(waitForRequests(30, batch_count)); + + for (auto &session : sessions) + { + session->FinishSession(); + ASSERT_FALSE(session->IsSessionActive()); + } + + ASSERT_TRUE(handler->is_called_); + ASSERT_TRUE(handler->got_response_); + + http_client.WaitBackgroundThreadExit(); + } +} + +TEST_F(BasicCurlHttpTests, FinishInAsyncCallback) +{ + curl::HttpClient http_client; + + for (int round = 0; round < 2; ++round) + { + received_requests_.clear(); + static constexpr const unsigned batch_count = 5; + std::shared_ptr sessions[batch_count]; + std::shared_ptr handlers[batch_count]; + for (unsigned i = 0; i < batch_count; ++i) + { + sessions[i] = http_client.CreateSession("http://127.0.0.1:19000/get/"); + auto request = sessions[i]->CreateRequest(); + request->SetMethod(http_client::Method::Get); + request->SetUri("get/"); + + handlers[i] = std::make_shared(sessions[i]); + + // Lock mtx_requests to prevent response, we will check IsSessionActive() in the end + std::unique_lock lock_requests(mtx_requests); + sessions[i]->SendRequest(handlers[i]); + ASSERT_TRUE(sessions[i]->IsSessionActive()); + } + + http_client.WaitBackgroundThreadExit(); + ASSERT_TRUE(waitForRequests(300, batch_count)); + + for (unsigned i = 0; i < batch_count; ++i) + { + ASSERT_FALSE(sessions[i]->IsSessionActive()); + + ASSERT_TRUE(handlers[i]->is_called_); + ASSERT_TRUE(handlers[i]->got_response_); + } + } +}