From 521831f83c464ca7250a56f91b43c04e9d76efb5 Mon Sep 17 00:00:00 2001 From: owent Date: Tue, 22 Mar 2022 16:11:48 +0800 Subject: [PATCH] Implement async version of `OtlpHttpClient::Export` Signed-off-by: owent --- .../exporters/otlp/otlp_http_client.h | 81 +++-- exporters/otlp/src/otlp_http_client.cc | 289 +++++++++++------- exporters/otlp/src/otlp_http_exporter.cc | 11 +- exporters/otlp/src/otlp_http_log_exporter.cc | 10 +- 4 files changed, 245 insertions(+), 146 deletions(-) diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h index 0c1c1fe684..a4b90bc13a 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h @@ -11,6 +11,7 @@ #include "opentelemetry/common/spin_lock_mutex.h" #include "opentelemetry/ext/http/client/http_client.h" +#include "opentelemetry/nostd/variant.h" #include "opentelemetry/sdk/common/exporter_utils.h" #include "opentelemetry/exporters/otlp/otlp_environment.h" @@ -19,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -112,12 +114,22 @@ class OtlpHttpClient ~OtlpHttpClient(); /** - * Export + * Sync export * @param message message to export, it should be ExportTraceServiceRequest, * ExportMetricsServiceRequest or ExportLogsServiceRequest */ sdk::common::ExportResult Export(const google::protobuf::Message &message) noexcept; + /** + * Async export + * @param message message to export, it should be ExportTraceServiceRequest, + * ExportMetricsServiceRequest or ExportLogsServiceRequest + * @param result_callback callback to call when the exporting is done + */ + void Export( + const google::protobuf::Message &message, + std::function &&result_callback) noexcept; + /** * Shut down the HTTP client. * @param timeout an optional timeout, the default timeout of 0 means that no @@ -134,14 +146,49 @@ class OtlpHttpClient void ReleaseSession(const opentelemetry::ext::http::client::Session &session) noexcept; private: + struct HttpSessionData + { + std::shared_ptr session; + std::shared_ptr event_handle; + + inline HttpSessionData() = default; + + inline explicit HttpSessionData( + std::shared_ptr &&input_session, + std::shared_ptr &&input_handle) + { + session.swap(input_session); + event_handle.swap(input_handle); + } + + inline explicit HttpSessionData(HttpSessionData &&other) + { + session.swap(other.session); + event_handle.swap(other.event_handle); + } + + inline HttpSessionData &operator=(HttpSessionData &&other) noexcept + { + session.swap(other.session); + event_handle.swap(other.event_handle); + return *this; + } + }; + + /** + * @brief Create a Session object or return a error result + * + * @param message The message to send + */ + nostd::variant createSession( + const google::protobuf::Message &message, + std::function &&result_callback) noexcept; + /** * Add http session and hold it's lifetime. - * @param session the session to add - * @param event_handle the event handle of this session + * @param session_data the session to add */ - void addSession( - std::shared_ptr session, - std::shared_ptr event_handle) noexcept; + void addSession(HttpSessionData &&session_data) noexcept; /** * @brief Real delete all sessions and event handles. @@ -165,28 +212,6 @@ class OtlpHttpClient OtlpHttpClient(OtlpHttpClientOptions &&options, std::shared_ptr http_client); - struct HttpSessionData - { - std::shared_ptr session; - std::shared_ptr event_handle; - - inline HttpSessionData() = default; - - inline explicit HttpSessionData( - std::shared_ptr &&input_session, - std::shared_ptr &&input_handle) - { - session.swap(input_session); - event_handle.swap(input_handle); - } - - inline explicit HttpSessionData(HttpSessionData &&other) - { - session.swap(other.session); - event_handle.swap(other.event_handle); - } - }; - // Stores if this HTTP client had its Shutdown() method called bool is_shutdown_; diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index 79c0b69fd8..191939a609 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -71,7 +71,9 @@ class ResponseHandler : public http_client::EventHandler /** * Creates a response handler, that by default doesn't display to console */ - ResponseHandler(bool console_debug = false) : console_debug_{console_debug} + ResponseHandler(std::function &&callback, + bool console_debug = false) + : result_callback_{std::move(callback)}, console_debug_{console_debug} { stoping_.store(false); } @@ -110,7 +112,7 @@ class ResponseHandler : public http_client::EventHandler bool expected = false; if (stoping_.compare_exchange_strong(expected, true, std::memory_order_release)) { - Unbind(); + Unbind(sdk::common::ExportResult::kSuccess); } } } @@ -293,12 +295,12 @@ class ResponseHandler : public http_client::EventHandler bool expected = false; if (stoping_.compare_exchange_strong(expected, true, std::memory_order_release)) { - Unbind(); + Unbind(sdk::common::ExportResult::kFailure); } } } - void Unbind() + void Unbind(sdk::common::ExportResult result) { // ReleaseSession may destroy this object, so we need to move owner and session into stack // first. @@ -312,6 +314,11 @@ class ResponseHandler : public http_client::EventHandler { // Release the session at last owner->ReleaseSession(*session); + + if (result_callback_) + { + result_callback_(result); + } } } @@ -337,6 +344,9 @@ class ResponseHandler : public http_client::EventHandler // A string to store the response body std::string body_ = ""; + // Result callback when in async mode + std::function result_callback_; + // Whether to print the results from the callback bool console_debug_ = false; }; @@ -668,106 +678,65 @@ OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options, opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export( const google::protobuf::Message &message) noexcept { - // Parse uri and store it to cache - if (http_uri_.empty()) + opentelemetry::sdk::common::ExportResult result = + opentelemetry::sdk::common::ExportResult::kSuccess; + auto session = + createSession(message, [&result](opentelemetry::sdk::common::ExportResult export_result) { + result = export_result; + return export_result == opentelemetry::sdk::common::ExportResult::kSuccess; + }); + + if (opentelemetry::nostd::holds_alternative(session)) { - auto parse_url = opentelemetry::ext::http::common::UrlParser(std::string(options_.url)); - if (!parse_url.success_) - { - std::string error_message = "[OTLP HTTP Client] Export failed, invalid url: " + options_.url; - if (options_.console_debug) - { - std::cerr << error_message << std::endl; - } - OTEL_INTERNAL_LOG_ERROR(error_message.c_str()); - - return opentelemetry::sdk::common::ExportResult::kFailure; - } - - if (!parse_url.path_.empty() && parse_url.path_[0] == '/') - { - http_uri_ = parse_url.path_.substr(1); - } - else - { - http_uri_ = parse_url.path_; - } + return opentelemetry::nostd::get(session); } - http_client::Body body_vec; - std::string content_type; - if (options_.content_type == HttpRequestContentType::kBinary) + // Wait for the response to be received + if (options_.console_debug) { - if (SerializeToHttpBody(body_vec, message)) - { - if (options_.console_debug) - { - OTEL_INTERNAL_LOG_DEBUG( - "[OTLP HTTP Client] Request body(Binary): " << message.Utf8DebugString()); - } - } - else - { - if (options_.console_debug) - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Serialize body failed(Binary):" - << message.InitializationErrorString()); - } - return opentelemetry::sdk::common::ExportResult::kFailure; - } - content_type = kHttpBinaryContentType; + OTEL_INTERNAL_LOG_DEBUG( + "[OTLP HTTP Client] DEBUG: Waiting for response from " + << options_.url << " (timeout = " + << std::chrono::duration_cast(options_.timeout).count() + << " milliseconds)"); } - else - { - nlohmann::json json_request; - - // Convert from proto into json object - ConvertGenericMessageToJson(json_request, message, options_); - std::string post_body_json = - json_request.dump(-1, ' ', false, nlohmann::detail::error_handler_t::replace); - if (options_.console_debug) - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Request body(Json)" << post_body_json); - } - body_vec.assign(post_body_json.begin(), post_body_json.end()); - content_type = kHttpJsonContentType; - } + addSession(std::move(opentelemetry::nostd::get(session))); - // Send the request - { + // Wait for any session to finish if there are to many sessions + std::unique_lock lock(session_waker_lock_); + bool wait_successful = session_waker_.wait_for(lock, options_.timeout, [this] { std::lock_guard guard{session_manager_lock_}; - // Return failure if this exporter has been shutdown - if (isShutdown()) - { - const char *error_message = "[OTLP HTTP Client] Export failed, exporter is shutdown"; - if (options_.console_debug) - { - std::cerr << error_message << std::endl; - } - OTEL_INTERNAL_LOG_ERROR(error_message); + return running_sessions_.size() <= 0; + }); - return opentelemetry::sdk::common::ExportResult::kFailure; - } + cleanupGCSessions(); - auto session = http_client_->CreateSession(options_.url); - auto request = session->CreateRequest(); + // If an error occurred with the HTTP request + if (!wait_successful) + { + return opentelemetry::sdk::common::ExportResult::kFailure; + } - for (auto &header : options_.http_headers) + return result; +} + +void OtlpHttpClient::Export( + const google::protobuf::Message &message, + std::function &&result_callback) noexcept +{ + auto session = createSession(message, std::move(result_callback)); + if (opentelemetry::nostd::holds_alternative(session)) + { + if (result_callback) { - request->AddHeader(header.first, header.second); + result_callback(opentelemetry::nostd::get(session)); } - request->SetUri(http_uri_); - request->SetTimeoutMs(std::chrono::duration_cast(options_.timeout)); - request->SetMethod(http_client::Method::Post); - request->SetBody(body_vec); - request->ReplaceHeader("Content-Type", content_type); - - // Send the request - addSession(std::move(session), std::shared_ptr{ - new ResponseHandler(options_.console_debug)}); + return; } + addSession(std::move(opentelemetry::nostd::get(session))); + // Wait for the response to be received if (options_.console_debug) { @@ -780,20 +749,12 @@ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export( // Wait for any session to finish if there are to many sessions std::unique_lock lock(session_waker_lock_); - bool wait_successful = session_waker_.wait_for(lock, options_.timeout, [this] { + session_waker_.wait_for(lock, options_.timeout, [this] { std::lock_guard guard{session_manager_lock_}; return running_sessions_.size() <= options_.concurrent_sessions; }); cleanupGCSessions(); - - // If an error occurred with the HTTP request - if (!wait_successful) - { - return opentelemetry::sdk::common::ExportResult::kFailure; - } - - return opentelemetry::sdk::common::ExportResult::kSuccess; } bool OtlpHttpClient::Shutdown(std::chrono::microseconds timeout) noexcept @@ -842,12 +803,12 @@ void OtlpHttpClient::ReleaseSession( { std::lock_guard guard{session_manager_lock_}; - auto seesion_iter = running_sessions_.find(&session); - if (seesion_iter != running_sessions_.end()) + auto session_iter = running_sessions_.find(&session); + if (session_iter != running_sessions_.end()) { // Move session and handle into gc list, and they will be destroyed later - gc_sessions_.emplace_back(std::move(seesion_iter->second)); - running_sessions_.erase(seesion_iter); + gc_sessions_.emplace_back(std::move(session_iter->second)); + running_sessions_.erase(session_iter); has_session = true; } @@ -859,26 +820,130 @@ void OtlpHttpClient::ReleaseSession( } } -void OtlpHttpClient::addSession( - std::shared_ptr session, - std::shared_ptr event_handle) noexcept +opentelemetry::nostd::variant +OtlpHttpClient::createSession( + const google::protobuf::Message &message, + std::function &&result_callback) noexcept +{ + // Parse uri and store it to cache + if (http_uri_.empty()) + { + auto parse_url = opentelemetry::ext::http::common::UrlParser(std::string(options_.url)); + if (!parse_url.success_) + { + std::string error_message = "[OTLP HTTP Client] Export failed, invalid url: " + options_.url; + if (options_.console_debug) + { + std::cerr << error_message << std::endl; + } + OTEL_INTERNAL_LOG_ERROR(error_message.c_str()); + + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + if (!parse_url.path_.empty() && parse_url.path_[0] == '/') + { + http_uri_ = parse_url.path_.substr(1); + } + else + { + http_uri_ = parse_url.path_; + } + } + + http_client::Body body_vec; + std::string content_type; + if (options_.content_type == HttpRequestContentType::kBinary) + { + if (SerializeToHttpBody(body_vec, message)) + { + if (options_.console_debug) + { + OTEL_INTERNAL_LOG_DEBUG( + "[OTLP HTTP Client] Request body(Binary): " << message.Utf8DebugString()); + } + } + else + { + if (options_.console_debug) + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Serialize body failed(Binary):" + << message.InitializationErrorString()); + } + return opentelemetry::sdk::common::ExportResult::kFailure; + } + content_type = kHttpBinaryContentType; + } + else + { + nlohmann::json json_request; + + // Convert from proto into json object + ConvertGenericMessageToJson(json_request, message, options_); + + std::string post_body_json = + json_request.dump(-1, ' ', false, nlohmann::detail::error_handler_t::replace); + if (options_.console_debug) + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Request body(Json)" << post_body_json); + } + body_vec.assign(post_body_json.begin(), post_body_json.end()); + content_type = kHttpJsonContentType; + } + + // Send the request + std::lock_guard guard{session_manager_lock_}; + // Return failure if this exporter has been shutdown + if (isShutdown()) + { + const char *error_message = "[OTLP HTTP Client] Export failed, exporter is shutdown"; + if (options_.console_debug) + { + std::cerr << error_message << std::endl; + } + OTEL_INTERNAL_LOG_ERROR(error_message); + + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + auto session = http_client_->CreateSession(options_.url); + auto request = session->CreateRequest(); + + for (auto &header : options_.http_headers) + { + request->AddHeader(header.first, header.second); + } + request->SetUri(http_uri_); + request->SetTimeoutMs(std::chrono::duration_cast(options_.timeout)); + request->SetMethod(http_client::Method::Post); + request->SetBody(body_vec); + request->ReplaceHeader("Content-Type", content_type); + + // Send the request + return HttpSessionData{ + std::move(session), + std::shared_ptr{ + new ResponseHandler(std::move(result_callback), options_.console_debug)}}; +} + +void OtlpHttpClient::addSession(HttpSessionData &&session_data) noexcept { - if (!session || !event_handle) + if (!session_data.session || !session_data.event_handle) { return; } - opentelemetry::ext::http::client::Session *key = session.get(); - ResponseHandler *handle = static_cast(event_handle.get()); + opentelemetry::ext::http::client::Session *key = session_data.session.get(); + ResponseHandler *handle = static_cast(session_data.event_handle.get()); handle->Bind(this, *key); - HttpSessionData &session_data = running_sessions_[key]; - session_data.session.swap(session); - session_data.event_handle.swap(event_handle); + HttpSessionData &store_session_data = running_sessions_[key]; + store_session_data = std::move(session_data); // Send request after the session is added - key->SendRequest(session_data.event_handle); + key->SendRequest(store_session_data.event_handle); } bool OtlpHttpClient::cleanupGCSessions() noexcept diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 29926d1651..4e3032844a 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -63,9 +63,14 @@ void OtlpHttpExporter::Export( const nostd::span> &spans, nostd::function_ref result_callback) noexcept { - OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call"); - auto status = Export(spans); - result_callback(status); + if (spans.empty()) + { + return; + } + + proto::collector::trace::v1::ExportTraceServiceRequest service_request; + OtlpRecordableUtils::PopulateRequest(spans, &service_request); + http_client_->Export(service_request, result_callback); } bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept diff --git a/exporters/otlp/src/otlp_http_log_exporter.cc b/exporters/otlp/src/otlp_http_log_exporter.cc index bd17c4fb37..0064b08f66 100644 --- a/exporters/otlp/src/otlp_http_log_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_exporter.cc @@ -64,9 +64,13 @@ void OtlpHttpLogExporter::Export( const nostd::span> &logs, nostd::function_ref result_callback) noexcept { - OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call"); - auto status = Export(logs); - result_callback(status); + if (logs.empty()) + { + return; + } + proto::collector::logs::v1::ExportLogsServiceRequest service_request; + OtlpRecordableUtils::PopulateRequest(logs, &service_request); + http_client_->Export(service_request, result_callback); } bool OtlpHttpLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept