-
Notifications
You must be signed in to change notification settings - Fork 417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added max async export support using separate AsyncBatchSpan/LogProcessor #1306
Changes from 1 commit
f218940
0c8dfcf
39df6c8
e4bc6e8
b15931f
1421910
4f2fc3b
ca556b1
c2c5860
f5324c9
3c3bb3e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,9 @@ | |
#include <atomic> | ||
#include <condition_variable> | ||
#include <thread> | ||
#ifdef ENABLE_ASYNC_EXPORT | ||
# include <list> | ||
#endif | ||
|
||
OPENTELEMETRY_BEGIN_NAMESPACE | ||
namespace sdk | ||
|
@@ -44,6 +47,10 @@ struct BatchSpanProcessorOptions | |
* Default implementation is synchronous. | ||
*/ | ||
bool is_export_async = false; | ||
|
||
/* Denotes the maximum number of async exports to continue | ||
*/ | ||
size_t max_export_async = 8; | ||
#endif | ||
}; | ||
|
||
|
@@ -134,22 +141,41 @@ class BatchSpanProcessor : public SpanProcessor | |
void DrainQueue(); | ||
|
||
#ifdef ENABLE_ASYNC_EXPORT | ||
/* In case of async export, wait and notify for shutdown to be completed.*/ | ||
void WaitForShutdownCompletion(); | ||
struct AsyncExportData | ||
{ | ||
nostd::span<std::unique_ptr<Recordable>> recordables; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need a span of recordable here? It is just a reference and may get invalidated. |
||
}; | ||
|
||
struct ExportDataStorage | ||
{ | ||
std::unordered_map<AsyncExportData *, std::unique_ptr<AsyncExportData>> running_async_exports; | ||
std::list<std::unique_ptr<AsyncExportData>> garbage_async_exports; | ||
}; | ||
std::shared_ptr<ExportDataStorage> export_data_storage_; | ||
|
||
bool CleanUpGarbageAsyncData(); | ||
|
||
const bool is_export_async_; | ||
const size_t max_export_async_; | ||
#endif | ||
|
||
struct SynchronizationData | ||
{ | ||
/* Synchronization primitives */ | ||
std::condition_variable cv, force_flush_cv, async_shutdown_cv; | ||
std::mutex cv_m, force_flush_cv_m, shutdown_m, async_shutdown_m; | ||
std::condition_variable cv, force_flush_cv; | ||
std::mutex cv_m, force_flush_cv_m, shutdown_m; | ||
|
||
/* Important boolean flags to handle the workflow of the processor */ | ||
std::atomic<bool> is_force_wakeup_background_worker; | ||
std::atomic<bool> is_force_flush_pending; | ||
std::atomic<bool> is_force_flush_notified; | ||
std::atomic<bool> is_shutdown; | ||
std::atomic<bool> is_async_shutdown_notified; | ||
#ifdef ENABLE_ASYNC_EXPORT | ||
std::mutex async_export_waker_m; | ||
std::condition_variable async_export_waker; | ||
|
||
std::mutex async_export_data_m; | ||
#endif | ||
}; | ||
|
||
/** | ||
|
@@ -169,9 +195,6 @@ class BatchSpanProcessor : public SpanProcessor | |
const size_t max_queue_size_; | ||
const std::chrono::milliseconds schedule_delay_millis_; | ||
const size_t max_export_batch_size_; | ||
#ifdef ENABLE_ASYNC_EXPORT | ||
const bool is_export_async_; | ||
#endif | ||
|
||
/* The buffer/queue to which the ended spans are added */ | ||
common::CircularBuffer<Recordable> buffer_; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,14 +17,21 @@ namespace logs | |
BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter, | ||
const size_t max_queue_size, | ||
const std::chrono::milliseconds scheduled_delay_millis, | ||
const size_t max_export_batch_size, | ||
const bool is_export_async) | ||
const size_t max_export_batch_size | ||
# ifdef ENABLE_ASYNC_EXPORT | ||
, | ||
const bool is_export_async, | ||
const size_t max_export_async | ||
# endif | ||
) | ||
: exporter_(std::move(exporter)), | ||
max_queue_size_(max_queue_size), | ||
scheduled_delay_millis_(scheduled_delay_millis), | ||
max_export_batch_size_(max_export_batch_size), | ||
# ifdef ENABLE_ASYNC_EXPORT | ||
is_export_async_(is_export_async), | ||
max_export_async_(max_export_async), | ||
export_data_storage_(std::make_shared<ExportDataStorage>()), | ||
# endif | ||
buffer_(max_queue_size_), | ||
synchronization_data_(std::make_shared<SynchronizationData>()), | ||
|
@@ -34,7 +41,6 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter, | |
synchronization_data_->is_force_flush_pending.store(false); | ||
synchronization_data_->is_force_flush_notified.store(false); | ||
synchronization_data_->is_shutdown.store(false); | ||
synchronization_data_->is_async_shutdown_notified.store(false); | ||
} | ||
|
||
BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter, | ||
|
@@ -45,6 +51,8 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter, | |
max_export_batch_size_(options.max_export_batch_size), | ||
# ifdef ENABLE_ASYNC_EXPORT | ||
is_export_async_(options.is_export_async), | ||
max_export_async_(options.max_export_async), | ||
export_data_storage_(std::make_shared<ExportDataStorage>()), | ||
# endif | ||
buffer_(options.max_queue_size), | ||
synchronization_data_(std::make_shared<SynchronizationData>()), | ||
|
@@ -54,7 +62,6 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter, | |
synchronization_data_->is_force_flush_pending.store(false); | ||
synchronization_data_->is_force_flush_notified.store(false); | ||
synchronization_data_->is_shutdown.store(false); | ||
synchronization_data_->is_async_shutdown_notified.store(false); | ||
} | ||
|
||
std::unique_ptr<Recordable> BatchLogProcessor::MakeRecordable() noexcept | ||
|
@@ -236,45 +243,61 @@ void BatchLogProcessor::Export() | |
} | ||
else | ||
{ | ||
std::unique_ptr<AsyncExportData> export_data(new AsyncExportData()); | ||
export_data->recordables = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we shouldn't save span of recordables here, it will be invalid after this function return.Also the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, should we save any state for async export? If not, will atomic counter suffice to keep track of all running exports? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can't check which callback is missing or timeout, so we can't recover it if we only has the atomic counter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have one more query: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can just drop it, just like the sync mode. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to be clear of the question, if the max number of async requests is ongoing, and the circular queue is full, we should drop the incoming recordable ( as in sync mode). |
||
nostd::span<std::unique_ptr<Recordable>>(records_arr.data(), records_arr.size()); | ||
auto ptr = export_data.get(); | ||
{ | ||
std::lock_guard<std::mutex> lk(synchronization_data_->async_export_data_m); | ||
export_data_storage_->running_async_exports[ptr] = std::move(export_data); | ||
} | ||
std::weak_ptr<ExportDataStorage> export_data_watcher = export_data_storage_; | ||
std::weak_ptr<SynchronizationData> synchronization_data_watcher = synchronization_data_; | ||
exporter_->Export( | ||
nostd::span<std::unique_ptr<Recordable>>(records_arr.data(), records_arr.size()), | ||
[notify_force_flush, synchronization_data_watcher](sdk::common::ExportResult result) { | ||
// TODO: Print result | ||
if (synchronization_data_watcher.expired()) | ||
{ | ||
return true; | ||
} | ||
|
||
NotifyCompletion(notify_force_flush, synchronization_data_watcher.lock()); | ||
return true; | ||
}); | ||
exporter_->Export(ptr->recordables, [notify_force_flush, synchronization_data_watcher, ptr, | ||
export_data_watcher](sdk::common::ExportResult result) { | ||
// TODO: Print result | ||
if (synchronization_data_watcher.expired()) | ||
{ | ||
return true; | ||
} | ||
if (export_data_watcher.expired()) | ||
{ | ||
return true; | ||
} | ||
auto synchronization_data = synchronization_data_watcher.lock(); | ||
auto export_data = export_data_watcher.lock(); | ||
{ | ||
std::lock_guard<std::mutex> lk(synchronization_data->async_export_data_m); | ||
export_data->garbage_async_exports.emplace_back( | ||
std::move(export_data->running_async_exports[ptr])); | ||
export_data->running_async_exports.erase(ptr); | ||
} | ||
|
||
NotifyCompletion(notify_force_flush, synchronization_data); | ||
return true; | ||
}); | ||
} | ||
// wait for running async exports < max async export allowed | ||
std::unique_lock<std::mutex> lock(synchronization_data_->async_export_waker_m); | ||
synchronization_data_->async_export_waker.wait_for(lock, scheduled_delay_millis_, [this] { | ||
std::lock_guard<std::mutex> lk(synchronization_data_->async_export_data_m); | ||
return export_data_storage_->running_async_exports.size() <= max_export_async_; | ||
}); | ||
|
||
// Clean up garbage exports | ||
CleanUpGarbageAsyncData(); | ||
# endif | ||
} while (true); | ||
} | ||
|
||
# ifdef ENABLE_ASYNC_EXPORT | ||
void BatchLogProcessor::WaitForShutdownCompletion() | ||
bool BatchLogProcessor::CleanUpGarbageAsyncData() | ||
{ | ||
// Since async export is invoked due to shutdown, need to wait | ||
// for async thread to complete. | ||
if (is_export_async_) | ||
{ | ||
std::unique_lock<std::mutex> lk(synchronization_data_->async_shutdown_m); | ||
while (true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we still need a indefinite loop to wait all running exports to finish in destructor? Or we may start to destroy a exporter when it's still running. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Waiting logic to wait on all running exports to finish is added in Shutdown function. |
||
{ | ||
if (synchronization_data_->is_async_shutdown_notified.load()) | ||
{ | ||
break; | ||
} | ||
std::lock_guard<std::mutex> lk(synchronization_data_->async_export_data_m); | ||
std::list<std::unique_ptr<AsyncExportData>> garbage; | ||
garbage.swap(export_data_storage_->garbage_async_exports); | ||
|
||
// When is_async_shutdown_notified.store(true) and async_shutdown_cv.notify_all() is called | ||
// between is_async_shutdown_notified.load() and async_shutdown_cv.wait(). We must not wait | ||
// for ever | ||
synchronization_data_->async_shutdown_cv.wait_for(lk, scheduled_delay_millis_); | ||
} | ||
} | ||
return export_data_storage_->garbage_async_exports.empty() == true; | ||
} | ||
# endif | ||
|
||
|
@@ -292,13 +315,6 @@ void BatchLogProcessor::NotifyCompletion( | |
synchronization_data->is_force_flush_notified.store(true, std::memory_order_release); | ||
synchronization_data->force_flush_cv.notify_one(); | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah will do this. |
||
// Notify the thread which is waiting on shutdown to complete. | ||
if (synchronization_data->is_shutdown.load() == true) | ||
{ | ||
synchronization_data->is_async_shutdown_notified.store(true); | ||
synchronization_data->async_shutdown_cv.notify_all(); | ||
} | ||
} | ||
|
||
void BatchLogProcessor::DrainQueue() | ||
|
@@ -312,12 +328,6 @@ void BatchLogProcessor::DrainQueue() | |
} | ||
|
||
Export(); | ||
|
||
# ifdef ENABLE_ASYNC_EXPORT | ||
// Since async export is invoked due to shutdown, need to wait | ||
// for async thread to complete. | ||
WaitForShutdownCompletion(); | ||
# endif | ||
} | ||
} | ||
|
||
|
@@ -334,6 +344,17 @@ bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept | |
synchronization_data_->cv.notify_one(); | ||
worker_thread_.join(); | ||
} | ||
# ifdef ENABLE_ASYNC_EXPORT | ||
// wait for running async exports <= 0 | ||
std::unique_lock<std::mutex> lock(synchronization_data_->async_export_waker_m); | ||
synchronization_data_->async_export_waker.wait_for(lock, timeout, [this] { | ||
std::lock_guard<std::mutex> lk(synchronization_data_->async_export_data_m); | ||
return export_data_storage_->running_async_exports.size() <= 0; | ||
}); | ||
|
||
while (CleanUpGarbageAsyncData() == false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we use async_export_waker to wait a event instead of busy wait here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you talking about async_export_waker.notify_all() to be called in other areas? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean |
||
; | ||
# endif | ||
|
||
auto worker_end_time = std::chrono::system_clock::now(); | ||
auto offset = std::chrono::duration_cast<std::chrono::microseconds>(worker_end_time - start_time); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a CI build for
ENABLE_ASYNC_EXPORT
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will add on this PR