-
Notifications
You must be signed in to change notification settings - Fork 417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrency HTTP sessions for OtlpHttpClient
#1209
Conversation
OtlpHttpClient
OtlpHttpClient
Codecov Report
@@ Coverage Diff @@
## main #1209 +/- ##
==========================================
- Coverage 91.99% 91.69% -0.29%
==========================================
Files 205 205
Lines 7395 7528 +133
==========================================
+ Hits 6802 6902 +100
- Misses 593 626 +33
|
OtlpHttpClient
OtlpHttpClient
1cafb98
to
073c598
Compare
Thanks for the PR. Would be reviewing this early next week. |
000162c
to
db327de
Compare
After #1185 is merged, there are no unit test for concurrency sessions now. I wonder whether should I add unit test for this, if so, the connecting problem (in |
session.second->CancelSession(); | ||
std::map<uint64_t, std::shared_ptr<Session>> sessions; | ||
sessions.swap(sessions_); | ||
for (auto &session : sessions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why iterating over sessions_
is not ok but iterating over sessions
is ok here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call stack HttpClient::CancelAllSessions
-> Session::CancelSession
-> HttpClient::CleanupSession
will crash here.
CleanupSession
will change sessions_
when iterating it.
@@ -256,6 +259,8 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTest) | |||
http_client::nosend::Response response; | |||
callback.OnResponse(response); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need response.Finish(callback);
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to reuse it to trigger OnEvent
and call OnResponse
. The codes do not trigger OnEvent
before.
exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h
Outdated
Show resolved
Hide resolved
request->SetMethod(http_client::Method::Post); | ||
request->SetBody(body_vec); | ||
request->ReplaceHeader("Content-Type", content_type); | ||
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to do all the processing before checking for Shutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used to protect isShutdown()
, is_shutdown_
, running_sessions_
and http_client_
. They are all not thread-safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should make isShutdown
thread safe, and short circuit here (e.g. return if isShutdown() is true).
bool OtlpHttpClient::isShutdown() const noexcept | ||
{ | ||
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_); | ||
return is_shutdown_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without lock this read would have race condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_shutdown_
and isShutdown()
are protected by session_manager_lock_
now.
The timeout test seems not a problem of this PR
|
We have a internal benmark report which shows this PR increase QPS of |
std::unique_lock<std::mutex> lock(session_waker_lock_); | ||
bool wait_successful = session_waker_.wait_for(lock, options_.timeout, [this] { | ||
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_}; | ||
return running_sessions_.size() <= options_.concurrent_sessions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if number of current running sessions are less than max-configured? Export
will return success even though there is no real successful export ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
We have a service need to export a lot logs.When the QPS is greater than 4k, the thread of otel-cpp cost only 7% of CPU time but it start to drop datas. Because when the exporter is waiting for http response, the batch log processor is still receiving more logs and the queue in it grow quickly and will be full soon.
In our test,after we set concurrent session to 8, it can send more than 13k request/second and drop nothing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I can see the improvement we can achieve with these changes. The concern I have is that this deviates from the specs (https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#interface-definition-1), which states that the Export method should return the result of transfer of the data over the wire. This can allow the Span Processor to take decision accordingly say retrying the failed transactions.
We may want to change the Export method to return the result callback to make these changes compliant to specs ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, had a quick look into how other SIGs are handling this. Java returns export status to processor asynchronously using CompletableFuture, while JS returns status using callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, had a quick look into how other SIGs are handling this. Java returns export status to processor asynchronously using CompletableFuture, while JS returns status using callback.
Good idea. Maybe we can use nostd::variant<std::future<sdk::common::ExportResult>>
here. But it's a break change for exporters's APIs. I think it need more discussion about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The OtlpGrpcExporter
and OtlpGrpcLogExporter
have the same problem, but I didn't try to use asynchronize API or test the performance yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it's a break change for exporters's APIs. I think it need more discussion about it.
Yes, I was thinking about that too. We have to keep supporting the existing Exporter::Export()
interface and add new using callback/future. And a new config option for SpanProcessor to select which interface to use. But it needs some thought and discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The OtlpGrpcExporter and OtlpGrpcLogExporter have the same problem, but I didn't try to use asynchronize API or test the performance yet.
Agree, they too suffer from the same issue.
Good to know that. I had quick glance at the Export() function, will review it more thoroughly tomorrow. Sorry for the delay on this. |
OtlpHttpClient
OtlpHttpClient
* | ||
* @return return true if there are more sessions to delete | ||
*/ | ||
bool cleanupGCSessions() noexcept; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is "GC"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GC here mean garbage collection of http sessions.When a http session is finished, we can not destroy it immediately right now, because it may still be visited in some codes. (e.g. When we remove a session in OnResponse, it will be visited in OnEvent later).
So I move the finished sessions into gc_sessions_ and real destroy it soon later.
@@ -99,6 +109,8 @@ class OtlpHttpClient | |||
*/ | |||
explicit OtlpHttpClient(OtlpHttpClientOptions &&options); | |||
|
|||
~OtlpHttpClient(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed to be explicit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need to hold the lock and then clean up all sessions. When removing sessions, we must make sure not to change gc_sessions_
when we iterating it to call FinishSession
of all running sessions.
@owent - This is a great initiative, thanks once again for the PR. Do you think it would make sense to have |
6c8838e
to
6ae64e5
Compare
6ae64e5
to
fca159b
Compare
+ Temporary fix the `Export()` may be called too many times when we shutdown or just wakeup worker thread once.This problem is completely fixed in open-telemetry#1209 but not merged yet. + Fix http client may has a different error code when our network is under a proxy.
+ Temporary fix the `Export()` may be called too many times when we shutdown or just wakeup worker thread once.This problem is completely fixed in open-telemetry#1209 but not merged yet. + Fix http client may has a different error code when our network is under a proxy.
+ Temporary fix the `Export()` may be called too many times when we shutdown or just wakeup worker thread once.This problem is completely fixed in open-telemetry#1209 but not merged yet. + Fix http client may has a different error code when our network is under a proxy.
659e11f
to
e47da76
Compare
e47da76
to
90942be
Compare
90942be
to
521831f
Compare
Signed-off-by: owent <admin@owent.net>
521831f
to
4174407
Compare
…pHttpLogExporter`. Add tests for both sync and async exporting. Signed-off-by: owent <admin@owent.net>
4174407
to
8ed3424
Compare
Signed-off-by: owent <admin@owent.net>
…alled without any span and log in async mode Signed-off-by: owent <admin@owent.net>
…ecords when we shudown batch log/span processor. Signed-off-by: owent <admin@owent.net>
Signed-off-by: owent <admin@owent.net>
@owent - can we close this PR if the changes are already in feature branch? |
1 similar comment
@owent - can we close this PR if the changes are already in feature branch? |
Yes, this is already moved to async-changes |
Fixes #1176
Changes
OtlpHttpClient
concurrency_sessions
to control the max concurrency http sessions.For significant contributions please make sure you have completed the following items:
CHANGELOG.md
updated for non-trivial changes