Skip to content

Commit

Permalink
Add timeout support to MeterContext::ForceFlush (#1673)
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomsonTan authored Oct 15, 2022
1 parent fa5f9fc commit 47a897d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
2 changes: 1 addition & 1 deletion sdk/src/logs/multi_log_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void MultiLogProcessor::OnEmit(std::unique_ptr<Recordable> &&record) noexcept

bool MultiLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
{
// Converto nanos to prevent overflow
// Convert to nanos to prevent overflow
std::chrono::nanoseconds timeout_ns = std::chrono::nanoseconds::max();
if (std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns) > timeout)
{
Expand Down
42 changes: 39 additions & 3 deletions sdk/src/metrics/meter_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,51 @@ bool MeterContext::Shutdown() noexcept

bool MeterContext::ForceFlush(std::chrono::microseconds timeout) noexcept
{
// TODO - Implement timeout logic.
bool result = true;
if (!shutdown_latch_.test_and_set(std::memory_order_acquire))
{
// Convert to nanos to prevent overflow
auto timeout_ns = std::chrono::nanoseconds::max();
if (std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns) > timeout)
{
timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout);
}

auto current_time = std::chrono::system_clock::now();
std::chrono::system_clock::time_point expire_time;
auto overflow_checker = std::chrono::system_clock::time_point::max();

// check if the expected expire time doesn't overflow.
if (overflow_checker - current_time > timeout_ns)
{
expire_time = current_time +
std::chrono::duration_cast<std::chrono::system_clock::duration>(timeout_ns);
}
else
{
// overflow happens, reset expire time to max.
expire_time = overflow_checker;
}

for (auto &collector : collectors_)
{
bool status = std::static_pointer_cast<MetricCollector>(collector)->ForceFlush(timeout);
result = result && status;
if (!std::static_pointer_cast<MetricCollector>(collector)->ForceFlush(
std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns)))
{
result = false;
}

current_time = std::chrono::system_clock::now();

if (expire_time >= current_time)
{
timeout_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(expire_time - current_time);
}
else
{
timeout_ns = std::chrono::nanoseconds::zero();
}
}
if (!result)
{
Expand Down

0 comments on commit 47a897d

Please sign in to comment.