Skip to content

Commit

Permalink
Merge branch 'main' into fixes_2982
Browse files Browse the repository at this point in the history
  • Loading branch information
marcalff authored Jul 19, 2024
2 parents 9e0af1a + f195b9e commit 98905fd
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstddef>
Expand Down Expand Up @@ -275,7 +276,7 @@ class OtlpHttpClient
std::shared_ptr<ext::http::client::HttpClient> http_client);

// Stores if this HTTP client had its Shutdown() method called
bool is_shutdown_;
std::atomic<bool> is_shutdown_;

// The configuration options associated with this HTTP client.
const OtlpHttpClientOptions options_;
Expand All @@ -296,6 +297,8 @@ class OtlpHttpClient
// Condition variable and mutex to control the concurrency count of running sessions
std::mutex session_waker_lock_;
std::condition_variable session_waker_;
std::atomic<size_t> start_session_counter_;
std::atomic<size_t> finished_session_counter_;
};
} // namespace otlp
} // namespace exporter
Expand Down
9 changes: 7 additions & 2 deletions exporters/otlp/src/otlp_grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,14 +567,19 @@ bool OtlpGrpcClient::Shutdown(std::chrono::microseconds timeout) noexcept
return true;
}

bool force_flush_result;
if (false == is_shutdown_.exchange(true, std::memory_order_acq_rel))
{
is_shutdown_ = true;
force_flush_result = ForceFlush(timeout);

async_data_->cq.Shutdown();
}
else
{
force_flush_result = ForceFlush(timeout);
}

return ForceFlush(timeout);
return force_flush_result;
}

#endif
Expand Down
38 changes: 28 additions & 10 deletions exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@

// clang-format off
#include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" // IWYU pragma: keep
#include <google/protobuf/message.h>
// clang-format on
#include <google/protobuf/descriptor.h>
#include <google/protobuf/message.h>
#include <google/protobuf/stubs/port.h>
// clang-format off
#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep
// clang-format on

Expand Down Expand Up @@ -661,7 +663,11 @@ void ConvertListFieldToJson(nlohmann::json &value,
} // namespace

OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options)
: is_shutdown_(false), options_(options), http_client_(http_client::HttpClientFactory::Create())
: is_shutdown_(false),
options_(options),
http_client_(http_client::HttpClientFactory::Create()),
start_session_counter_(0),
finished_session_counter_(0)
{
http_client_->SetMaxSessionsPerConnection(options_.max_requests_per_connection);
}
Expand Down Expand Up @@ -700,7 +706,11 @@ OtlpHttpClient::~OtlpHttpClient()

OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options,
std::shared_ptr<ext::http::client::HttpClient> http_client)
: is_shutdown_(false), options_(options), http_client_(std::move(http_client))
: is_shutdown_(false),
options_(options),
http_client_(std::move(http_client)),
start_session_counter_(0),
finished_session_counter_(0)
{
http_client_->SetMaxSessionsPerConnection(options_.max_requests_per_connection);
}
Expand Down Expand Up @@ -799,6 +809,8 @@ bool OtlpHttpClient::ForceFlush(std::chrono::microseconds timeout) noexcept
timeout_steady = (std::chrono::steady_clock::duration::max)();
}

size_t wait_counter = start_session_counter_.load(std::memory_order_acquire);

while (timeout_steady > std::chrono::steady_clock::duration::zero())
{
{
Expand All @@ -816,7 +828,7 @@ bool OtlpHttpClient::ForceFlush(std::chrono::microseconds timeout) noexcept
{
cleanupGCSessions();
}
else
else if (finished_session_counter_.load(std::memory_order_acquire) >= wait_counter)
{
break;
}
Expand All @@ -829,20 +841,24 @@ bool OtlpHttpClient::ForceFlush(std::chrono::microseconds timeout) noexcept

bool OtlpHttpClient::Shutdown(std::chrono::microseconds timeout) noexcept
{
is_shutdown_.store(true, std::memory_order_release);

bool force_flush_result = ForceFlush(timeout);

{
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
is_shutdown_ = true;

// Shutdown the session manager
http_client_->CancelAllSessions();
http_client_->FinishAllSessions();
}

ForceFlush(timeout);

// Wait util all sessions are canceled.
while (cleanupGCSessions())
;
return true;
{
ForceFlush(std::chrono::milliseconds{1});
}
return force_flush_result;
}

void OtlpHttpClient::ReleaseSession(
Expand All @@ -859,6 +875,7 @@ void OtlpHttpClient::ReleaseSession(
gc_sessions_.emplace_back(std::move(session_iter->second));
running_sessions_.erase(session_iter);

finished_session_counter_.fetch_add(1, std::memory_order_release);
has_session = true;
}

Expand Down Expand Up @@ -1003,6 +1020,7 @@ void OtlpHttpClient::addSession(HttpSessionData &&session_data) noexcept
store_session_data = std::move(session_data);
}

start_session_counter_.fetch_add(1, std::memory_order_release);
// Send request after the session is added
session->SendRequest(handle);
}
Expand All @@ -1027,7 +1045,7 @@ bool OtlpHttpClient::cleanupGCSessions() noexcept

bool OtlpHttpClient::IsShutdown() const noexcept
{
return is_shutdown_;
return is_shutdown_.load(std::memory_order_acquire);
}

} // namespace otlp
Expand Down

0 comments on commit 98905fd

Please sign in to comment.