Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added max async export support using separate AsyncBatchSpan/LogProcessor #1306

Merged
merged 11 commits into from
May 4, 2022
10 changes: 7 additions & 3 deletions exporters/otlp/test/otlp_http_log_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,13 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test
std::string attribute_storage_string_value[] = {"vector", "string"};

auto provider = nostd::shared_ptr<sdk::logs::LoggerProvider>(new sdk::logs::LoggerProvider());
provider->AddProcessor(
std::unique_ptr<sdk::logs::LogProcessor>(new sdk::logs::BatchLogProcessor(
std::move(exporter), 5, std::chrono::milliseconds(256), 5, is_async)));
provider->AddProcessor(std::unique_ptr<sdk::logs::LogProcessor>(
new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 5
# ifdef ENABLE_ASYNC_EXPORT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a CI build for ENABLE_ASYNC_EXPORT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will add on this PR

,
is_async
# endif
)));

std::string report_trace_id;
std::string report_span_id;
Expand Down
48 changes: 38 additions & 10 deletions sdk/include/opentelemetry/sdk/logs/batch_log_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# include <cstdint>
# include <memory>
# include <thread>
# ifdef ENABLE_ASYNC_EXPORT
# include <list>
# endif

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
Expand Down Expand Up @@ -47,6 +50,10 @@ struct BatchLogProcessorOptions
* Default implementation is synchronous.
*/
bool is_export_async = false;

/* Denotes the maximum number of async exports to continue
*/
size_t max_export_async = 8;
# endif
};

Expand All @@ -72,8 +79,13 @@ class BatchLogProcessor : public LogProcessor
std::unique_ptr<LogExporter> &&exporter,
const size_t max_queue_size = 2048,
const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000),
const size_t max_export_batch_size = 512,
const bool is_export_async = false);
const size_t max_export_batch_size = 512
# ifdef ENABLE_ASYNC_EXPORT
,
const bool is_export_async = false,
const size_t max_export_async = 8
# endif
);

/**
* Creates a batch log processor by configuring the specified exporter and other parameters
Expand Down Expand Up @@ -137,21 +149,40 @@ class BatchLogProcessor : public LogProcessor
void DrainQueue();

# ifdef ENABLE_ASYNC_EXPORT
/* In case of async export, wait and notify for shutdown to be completed.*/
void WaitForShutdownCompletion();
struct AsyncExportData
{
nostd::span<std::unique_ptr<Recordable>> recordables;
};

struct ExportDataStorage
{
std::unordered_map<AsyncExportData *, std::unique_ptr<AsyncExportData>> running_async_exports;
std::list<std::unique_ptr<AsyncExportData>> garbage_async_exports;
};
std::shared_ptr<ExportDataStorage> export_data_storage_;

bool CleanUpGarbageAsyncData();

const bool is_export_async_;
const size_t max_export_async_;
# endif
struct SynchronizationData
{
/* Synchronization primitives */
std::condition_variable cv, force_flush_cv, async_shutdown_cv;
std::mutex cv_m, force_flush_cv_m, shutdown_m, async_shutdown_m;
std::condition_variable cv, force_flush_cv;
std::mutex cv_m, force_flush_cv_m, shutdown_m;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_force_wakeup_background_worker;
std::atomic<bool> is_force_flush_pending;
std::atomic<bool> is_force_flush_notified;
std::atomic<bool> is_shutdown;
std::atomic<bool> is_async_shutdown_notified;
# ifdef ENABLE_ASYNC_EXPORT
std::mutex async_export_waker_m;
std::condition_variable async_export_waker;

std::mutex async_export_data_m;
# endif
};

/**
Expand All @@ -171,9 +202,6 @@ class BatchLogProcessor : public LogProcessor
const size_t max_queue_size_;
const std::chrono::milliseconds scheduled_delay_millis_;
const size_t max_export_batch_size_;
# ifdef ENABLE_ASYNC_EXPORT
const bool is_export_async_;
# endif
/* The buffer/queue to which the ended logs are added */
common::CircularBuffer<Recordable> buffer_;

Expand Down
39 changes: 31 additions & 8 deletions sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include <atomic>
#include <condition_variable>
#include <thread>
#ifdef ENABLE_ASYNC_EXPORT
# include <list>
#endif

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
Expand Down Expand Up @@ -44,6 +47,10 @@ struct BatchSpanProcessorOptions
* Default implementation is synchronous.
*/
bool is_export_async = false;

/* Denotes the maximum number of async exports to continue
*/
size_t max_export_async = 8;
#endif
};

Expand Down Expand Up @@ -134,22 +141,41 @@ class BatchSpanProcessor : public SpanProcessor
void DrainQueue();

#ifdef ENABLE_ASYNC_EXPORT
/* In case of async export, wait and notify for shutdown to be completed.*/
void WaitForShutdownCompletion();
struct AsyncExportData
{
nostd::span<std::unique_ptr<Recordable>> recordables;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a span of recordable here? It is just a reference and may get invalidated.

};

struct ExportDataStorage
{
std::unordered_map<AsyncExportData *, std::unique_ptr<AsyncExportData>> running_async_exports;
std::list<std::unique_ptr<AsyncExportData>> garbage_async_exports;
};
std::shared_ptr<ExportDataStorage> export_data_storage_;

bool CleanUpGarbageAsyncData();

const bool is_export_async_;
const size_t max_export_async_;
#endif

struct SynchronizationData
{
/* Synchronization primitives */
std::condition_variable cv, force_flush_cv, async_shutdown_cv;
std::mutex cv_m, force_flush_cv_m, shutdown_m, async_shutdown_m;
std::condition_variable cv, force_flush_cv;
std::mutex cv_m, force_flush_cv_m, shutdown_m;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_force_wakeup_background_worker;
std::atomic<bool> is_force_flush_pending;
std::atomic<bool> is_force_flush_notified;
std::atomic<bool> is_shutdown;
std::atomic<bool> is_async_shutdown_notified;
#ifdef ENABLE_ASYNC_EXPORT
std::mutex async_export_waker_m;
std::condition_variable async_export_waker;

std::mutex async_export_data_m;
#endif
};

/**
Expand All @@ -169,9 +195,6 @@ class BatchSpanProcessor : public SpanProcessor
const size_t max_queue_size_;
const std::chrono::milliseconds schedule_delay_millis_;
const size_t max_export_batch_size_;
#ifdef ENABLE_ASYNC_EXPORT
const bool is_export_async_;
#endif

/* The buffer/queue to which the ended spans are added */
common::CircularBuffer<Recordable> buffer_;
Expand Down
115 changes: 68 additions & 47 deletions sdk/src/logs/batch_log_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@ namespace logs
BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter,
const size_t max_queue_size,
const std::chrono::milliseconds scheduled_delay_millis,
const size_t max_export_batch_size,
const bool is_export_async)
const size_t max_export_batch_size
# ifdef ENABLE_ASYNC_EXPORT
,
const bool is_export_async,
const size_t max_export_async
# endif
)
: exporter_(std::move(exporter)),
max_queue_size_(max_queue_size),
scheduled_delay_millis_(scheduled_delay_millis),
max_export_batch_size_(max_export_batch_size),
# ifdef ENABLE_ASYNC_EXPORT
is_export_async_(is_export_async),
max_export_async_(max_export_async),
export_data_storage_(std::make_shared<ExportDataStorage>()),
# endif
buffer_(max_queue_size_),
synchronization_data_(std::make_shared<SynchronizationData>()),
Expand All @@ -34,7 +41,6 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter,
synchronization_data_->is_force_flush_pending.store(false);
synchronization_data_->is_force_flush_notified.store(false);
synchronization_data_->is_shutdown.store(false);
synchronization_data_->is_async_shutdown_notified.store(false);
}

BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter,
Expand All @@ -45,6 +51,8 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter,
max_export_batch_size_(options.max_export_batch_size),
# ifdef ENABLE_ASYNC_EXPORT
is_export_async_(options.is_export_async),
max_export_async_(options.max_export_async),
export_data_storage_(std::make_shared<ExportDataStorage>()),
# endif
buffer_(options.max_queue_size),
synchronization_data_(std::make_shared<SynchronizationData>()),
Expand All @@ -54,7 +62,6 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter,
synchronization_data_->is_force_flush_pending.store(false);
synchronization_data_->is_force_flush_notified.store(false);
synchronization_data_->is_shutdown.store(false);
synchronization_data_->is_async_shutdown_notified.store(false);
}

std::unique_ptr<Recordable> BatchLogProcessor::MakeRecordable() noexcept
Expand Down Expand Up @@ -236,45 +243,61 @@ void BatchLogProcessor::Export()
}
else
{
std::unique_ptr<AsyncExportData> export_data(new AsyncExportData());
export_data->recordables =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't save span of recordables here, it will be invalid after this function return.Also the std::unique_ptr<Recordable> in spans_arr may be moved into exporter after calling exporter_->Export and left all elements nullptr in it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, should we save any state for async export? If not, will atomic counter suffice to keep track of all running exports?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can't check which callback is missing or timeout, so we can't recover it if we only has the atomic counter.
According to open-telemetry/opentelemetry-specification#2434 (comment)_ and open-telemetry/opentelemetry-specification#2434, it's the responsibility of exporters to do the retry logic. I think we can just save a sequence or a id related to the callback , but not every recordable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one more query:
Lets assume max_async_export is set to 8. Now, no callbacks are received on all 8 async export calls. Now, what should we do before we make the 9th async export - Should we discard the recordables and do not make any export call ? What is your suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just drop it, just like the sync mode.

Copy link
Member

@lalitb lalitb Apr 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear of the question, if the max number of async requests is ongoing, and the circular queue is full, we should drop the incoming recordable ( as in sync mode).

nostd::span<std::unique_ptr<Recordable>>(records_arr.data(), records_arr.size());
auto ptr = export_data.get();
{
std::lock_guard<std::mutex> lk(synchronization_data_->async_export_data_m);
export_data_storage_->running_async_exports[ptr] = std::move(export_data);
}
std::weak_ptr<ExportDataStorage> export_data_watcher = export_data_storage_;
std::weak_ptr<SynchronizationData> synchronization_data_watcher = synchronization_data_;
exporter_->Export(
nostd::span<std::unique_ptr<Recordable>>(records_arr.data(), records_arr.size()),
[notify_force_flush, synchronization_data_watcher](sdk::common::ExportResult result) {
// TODO: Print result
if (synchronization_data_watcher.expired())
{
return true;
}

NotifyCompletion(notify_force_flush, synchronization_data_watcher.lock());
return true;
});
exporter_->Export(ptr->recordables, [notify_force_flush, synchronization_data_watcher, ptr,
export_data_watcher](sdk::common::ExportResult result) {
// TODO: Print result
if (synchronization_data_watcher.expired())
{
return true;
}
if (export_data_watcher.expired())
{
return true;
}
auto synchronization_data = synchronization_data_watcher.lock();
auto export_data = export_data_watcher.lock();
{
std::lock_guard<std::mutex> lk(synchronization_data->async_export_data_m);
export_data->garbage_async_exports.emplace_back(
std::move(export_data->running_async_exports[ptr]));
export_data->running_async_exports.erase(ptr);
}

NotifyCompletion(notify_force_flush, synchronization_data);
return true;
});
}
// wait for running async exports < max async export allowed
std::unique_lock<std::mutex> lock(synchronization_data_->async_export_waker_m);
synchronization_data_->async_export_waker.wait_for(lock, scheduled_delay_millis_, [this] {
std::lock_guard<std::mutex> lk(synchronization_data_->async_export_data_m);
return export_data_storage_->running_async_exports.size() <= max_export_async_;
});

// Clean up garbage exports
CleanUpGarbageAsyncData();
# endif
} while (true);
}

# ifdef ENABLE_ASYNC_EXPORT
void BatchLogProcessor::WaitForShutdownCompletion()
bool BatchLogProcessor::CleanUpGarbageAsyncData()
{
// Since async export is invoked due to shutdown, need to wait
// for async thread to complete.
if (is_export_async_)
{
std::unique_lock<std::mutex> lk(synchronization_data_->async_shutdown_m);
while (true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still need a indefinite loop to wait all running exports to finish in destructor? Or we may start to destroy a exporter when it's still running.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting logic to wait on all running exports to finish is added in Shutdown function.

{
if (synchronization_data_->is_async_shutdown_notified.load())
{
break;
}
std::lock_guard<std::mutex> lk(synchronization_data_->async_export_data_m);
std::list<std::unique_ptr<AsyncExportData>> garbage;
garbage.swap(export_data_storage_->garbage_async_exports);

// When is_async_shutdown_notified.store(true) and async_shutdown_cv.notify_all() is called
// between is_async_shutdown_notified.load() and async_shutdown_cv.wait(). We must not wait
// for ever
synchronization_data_->async_shutdown_cv.wait_for(lk, scheduled_delay_millis_);
}
}
return export_data_storage_->garbage_async_exports.empty() == true;
}
# endif

Expand All @@ -292,13 +315,6 @@ void BatchLogProcessor::NotifyCompletion(
synchronization_data->is_force_flush_notified.store(true, std::memory_order_release);
synchronization_data->force_flush_cv.notify_one();
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to call async_export_waker.notify_all() here, or all threads call Export and Shutdown will be blocked util timeout.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah will do this.

// Notify the thread which is waiting on shutdown to complete.
if (synchronization_data->is_shutdown.load() == true)
{
synchronization_data->is_async_shutdown_notified.store(true);
synchronization_data->async_shutdown_cv.notify_all();
}
}

void BatchLogProcessor::DrainQueue()
Expand All @@ -312,12 +328,6 @@ void BatchLogProcessor::DrainQueue()
}

Export();

# ifdef ENABLE_ASYNC_EXPORT
// Since async export is invoked due to shutdown, need to wait
// for async thread to complete.
WaitForShutdownCompletion();
# endif
}
}

Expand All @@ -334,6 +344,17 @@ bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
synchronization_data_->cv.notify_one();
worker_thread_.join();
}
# ifdef ENABLE_ASYNC_EXPORT
// wait for running async exports <= 0
std::unique_lock<std::mutex> lock(synchronization_data_->async_export_waker_m);
synchronization_data_->async_export_waker.wait_for(lock, timeout, [this] {
std::lock_guard<std::mutex> lk(synchronization_data_->async_export_data_m);
return export_data_storage_->running_async_exports.size() <= 0;
});

while (CleanUpGarbageAsyncData() == false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use async_export_waker to wait a event instead of busy wait here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about async_export_waker.notify_all() to be called in other areas?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean while (CleanUpGarbageAsyncData() == false); is a busy wait. CleanUpGarbageAsyncData only lock async_export_data_m, and this lock has a very small critial section, it's almost busy wait when worker thread is exited when exporter is still exporting data in other thread.

;
# endif

auto worker_end_time = std::chrono::system_clock::now();
auto offset = std::chrono::duration_cast<std::chrono::microseconds>(worker_end_time - start_time);
Expand Down
Loading