Skip to content

Commit

Permalink
Fix:1676 Segfault when short export period is used for metrics (open-…
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored and yxue committed Dec 5, 2022
1 parent be8fc86 commit bee0d8b
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 40 deletions.
2 changes: 2 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/meter_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class MeterProvider final : public opentelemetry::metrics::MeterProvider
*/
bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

~MeterProvider() override;

private:
std::shared_ptr<sdk::metrics::MeterContext> context_;
std::mutex lock_;
Expand Down
81 changes: 42 additions & 39 deletions sdk/src/metrics/meter_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void MeterContext::AddMeter(std::shared_ptr<Meter> meter)
bool MeterContext::Shutdown() noexcept
{
bool result = true;
// Shutdown only once.
if (!shutdown_latch_.test_and_set(std::memory_order_acquire))
{

Expand All @@ -80,62 +81,64 @@ bool MeterContext::Shutdown() noexcept
OTEL_INTERNAL_LOG_WARN("[MeterContext::Shutdown] Unable to shutdown all metric readers");
}
}
else
{
OTEL_INTERNAL_LOG_WARN("[MeterContext::Shutdown] Shutdown can be invoked only once.");
}
return result;
}

bool MeterContext::ForceFlush(std::chrono::microseconds timeout) noexcept
{
bool result = true;
if (!shutdown_latch_.test_and_set(std::memory_order_acquire))
// Simultaneous flush not allowed.
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(forceflush_lock_);
// Convert to nanos to prevent overflow
auto timeout_ns = std::chrono::nanoseconds::max();
if (std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns) > timeout)
{
// Convert to nanos to prevent overflow
auto timeout_ns = std::chrono::nanoseconds::max();
if (std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns) > timeout)
{
timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout);
}
timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout);
}

auto current_time = std::chrono::system_clock::now();
std::chrono::system_clock::time_point expire_time;
auto overflow_checker = std::chrono::system_clock::time_point::max();
auto current_time = std::chrono::system_clock::now();
std::chrono::system_clock::time_point expire_time;
auto overflow_checker = std::chrono::system_clock::time_point::max();

// check if the expected expire time doesn't overflow.
if (overflow_checker - current_time > timeout_ns)
{
expire_time = current_time +
std::chrono::duration_cast<std::chrono::system_clock::duration>(timeout_ns);
}
else
// check if the expected expire time doesn't overflow.
if (overflow_checker - current_time > timeout_ns)
{
expire_time =
current_time + std::chrono::duration_cast<std::chrono::system_clock::duration>(timeout_ns);
}
else
{
// overflow happens, reset expire time to max.
expire_time = overflow_checker;
}

for (auto &collector : collectors_)
{
if (!std::static_pointer_cast<MetricCollector>(collector)->ForceFlush(
std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns)))
{
// overflow happens, reset expire time to max.
expire_time = overflow_checker;
result = false;
}

for (auto &collector : collectors_)
current_time = std::chrono::system_clock::now();

if (expire_time >= current_time)
{
if (!std::static_pointer_cast<MetricCollector>(collector)->ForceFlush(
std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns)))
{
result = false;
}

current_time = std::chrono::system_clock::now();

if (expire_time >= current_time)
{
timeout_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(expire_time - current_time);
}
else
{
timeout_ns = std::chrono::nanoseconds::zero();
}
timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(expire_time - current_time);
}
if (!result)
else
{
OTEL_INTERNAL_LOG_WARN("[MeterContext::ForceFlush] Unable to ForceFlush all metric readers");
timeout_ns = std::chrono::nanoseconds::zero();
}
}
if (!result)
{
OTEL_INTERNAL_LOG_WARN("[MeterContext::ForceFlush] Unable to ForceFlush all metric readers");
}
return result;
}

Expand Down
12 changes: 12 additions & 0 deletions sdk/src/metrics/meter_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ bool MeterProvider::ForceFlush(std::chrono::microseconds timeout) noexcept
return context_->ForceFlush(timeout);
}

/**
* Shutdown MeterContext when MeterProvider is destroyed.
*
*/
MeterProvider::~MeterProvider()
{
if (context_)
{
context_->Shutdown();
}
}

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Expand Down
6 changes: 5 additions & 1 deletion sdk/test/metrics/meter_provider_sdk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ TEST(MeterProvider, GetMeter)
ASSERT_EQ(m4, m5);
ASSERT_NE(m3, m6);

// Should be an sdk::trace::Tracer with the processor attached.
// Should be an sdk::metrics::Meter
# ifdef OPENTELEMETRY_RTTI_ENABLED
auto sdkMeter1 = dynamic_cast<Meter *>(m1.get());
# else
Expand All @@ -98,5 +98,9 @@ TEST(MeterProvider, GetMeter)
std::unique_ptr<MeterSelector> meter_selector{new MeterSelector("name1", "version1", "schema1")};

mp1.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view));

// cleanup properly without crash
mp1.ForceFlush();
mp1.Shutdown();
}
#endif

0 comments on commit bee0d8b

Please sign in to comment.