From 13f01cae932b2f73eb9eba7631ca257d729bb66f Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Thu, 21 Sep 2023 13:41:18 -0700 Subject: [PATCH] Add exemplar reservoir to async metric storage (#2319) --- .../sdk/metrics/state/async_metric_storage.h | 14 ++++++++++++++ sdk/src/metrics/meter.cc | 3 ++- sdk/test/metrics/async_metric_storage_test.cc | 14 ++++++++------ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index aada06142a..6a01d5c7f0 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -10,6 +10,7 @@ #include "opentelemetry/nostd/shared_ptr.h" #include "opentelemetry/sdk/common/attributemap_hash.h" #include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" +#include "opentelemetry/sdk/metrics/exemplar/reservoir.h" #include "opentelemetry/sdk/metrics/instruments.h" #include "opentelemetry/sdk/metrics/observer_result.h" #include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" @@ -29,11 +30,16 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora public: AsyncMetricStorage(InstrumentDescriptor instrument_descriptor, const AggregationType aggregation_type, + nostd::shared_ptr &&exemplar_reservoir + OPENTELEMETRY_MAYBE_UNUSED, const AggregationConfig *aggregation_config) : instrument_descriptor_(instrument_descriptor), aggregation_type_{aggregation_type}, cumulative_hash_map_(new AttributesHashMap()), delta_hash_map_(new AttributesHashMap()), +#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW + exemplar_reservoir_(exemplar_reservoir), +#endif temporal_metric_storage_(instrument_descriptor, aggregation_type, aggregation_config) {} @@ -47,6 +53,11 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora std::lock_guard guard(hashmap_lock_); for (auto &measurement : measurements) { +#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW + exemplar_reservoir_->OfferMeasurement(measurement.second, {}, {}, + std::chrono::system_clock::now()); +#endif + auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_); aggr->Aggregate(measurement.second); auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(measurement.first); @@ -119,6 +130,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora std::unique_ptr cumulative_hash_map_; std::unique_ptr delta_hash_map_; opentelemetry::common::SpinLockMutex hashmap_lock_; +#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW + nostd::shared_ptr exemplar_reservoir_; +#endif TemporalMetricStorage temporal_metric_storage_; }; diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index 6b8b6c8925..1abb570b43 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -368,7 +368,8 @@ std::unique_ptr Meter::RegisterAsyncMetricStorage( view_instr_desc.description_ = view.GetDescription(); } auto storage = std::shared_ptr(new AsyncMetricStorage( - view_instr_desc, view.GetAggregationType(), view.GetAggregationConfig())); + view_instr_desc, view.GetAggregationType(), ExemplarReservoir::GetNoExemplarReservoir(), + view.GetAggregationConfig())); storage_registry_[instrument_descriptor.name_] = storage; static_cast(storages.get())->AddStorage(storage); return true; diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index 1fef4e9de3..7b43fea14d 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -5,6 +5,7 @@ #include "opentelemetry/common/key_value_iterable_view.h" #include "opentelemetry/sdk/metrics/async_instruments.h" +#include "opentelemetry/sdk/metrics/exemplar/reservoir.h" #include "opentelemetry/sdk/metrics/instruments.h" #include "opentelemetry/sdk/metrics/meter_context.h" #include "opentelemetry/sdk/metrics/metric_reader.h" @@ -53,8 +54,8 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) std::vector> collectors; collectors.push_back(collector); - opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kSum, - nullptr); + opentelemetry::sdk::metrics::AsyncMetricStorage storage( + instr_desc, AggregationType::kSum, ExemplarReservoir::GetNoExemplarReservoir(), nullptr); int64_t get_count1 = 20; int64_t put_count1 = 10; std::unordered_map measurements1 = { @@ -144,8 +145,8 @@ TEST_P(WritableMetricStorageTestUpDownFixture, TestAggregation) std::vector> collectors; collectors.push_back(collector); - opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kDefault, - nullptr); + opentelemetry::sdk::metrics::AsyncMetricStorage storage( + instr_desc, AggregationType::kDefault, ExemplarReservoir::GetNoExemplarReservoir(), nullptr); int64_t get_count1 = 20; int64_t put_count1 = 10; std::unordered_map measurements1 = { @@ -234,8 +235,9 @@ TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation) std::vector> collectors; collectors.push_back(collector); - opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kLastValue, - nullptr); + opentelemetry::sdk::metrics::AsyncMetricStorage storage( + instr_desc, AggregationType::kLastValue, ExemplarReservoir::GetNoExemplarReservoir(), + nullptr); int64_t freq_cpu0 = 3; int64_t freq_cpu1 = 5; std::unordered_map measurements1 = {