Skip to content

Commit

Permalink
Add exemplar reservoir to async metric storage (open-telemetry#2319)
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomsonTan authored Sep 21, 2023
1 parent 0563a71 commit 13f01ca
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
14 changes: 14 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/sdk/common/attributemap_hash.h"
#include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h"
#include "opentelemetry/sdk/metrics/exemplar/reservoir.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/observer_result.h"
#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
Expand All @@ -29,11 +30,16 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
public:
AsyncMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
nostd::shared_ptr<ExemplarReservoir> &&exemplar_reservoir
OPENTELEMETRY_MAYBE_UNUSED,
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
cumulative_hash_map_(new AttributesHashMap()),
delta_hash_map_(new AttributesHashMap()),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_(exemplar_reservoir),
#endif
temporal_metric_storage_(instrument_descriptor, aggregation_type, aggregation_config)
{}

Expand All @@ -47,6 +53,11 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(hashmap_lock_);
for (auto &measurement : measurements)
{
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_->OfferMeasurement(measurement.second, {}, {},
std::chrono::system_clock::now());
#endif

auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
aggr->Aggregate(measurement.second);
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(measurement.first);
Expand Down Expand Up @@ -119,6 +130,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
std::unique_ptr<AttributesHashMap> cumulative_hash_map_;
std::unique_ptr<AttributesHashMap> delta_hash_map_;
opentelemetry::common::SpinLockMutex hashmap_lock_;
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
nostd::shared_ptr<ExemplarReservoir> exemplar_reservoir_;
#endif
TemporalMetricStorage temporal_metric_storage_;
};

Expand Down
3 changes: 2 additions & 1 deletion sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
view_instr_desc.description_ = view.GetDescription();
}
auto storage = std::shared_ptr<AsyncMetricStorage>(new AsyncMetricStorage(
view_instr_desc, view.GetAggregationType(), view.GetAggregationConfig()));
view_instr_desc, view.GetAggregationType(), ExemplarReservoir::GetNoExemplarReservoir(),
view.GetAggregationConfig()));
storage_registry_[instrument_descriptor.name_] = storage;
static_cast<AsyncMultiMetricStorage *>(storages.get())->AddStorage(storage);
return true;
Expand Down
14 changes: 8 additions & 6 deletions sdk/test/metrics/async_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "opentelemetry/common/key_value_iterable_view.h"
#include "opentelemetry/sdk/metrics/async_instruments.h"
#include "opentelemetry/sdk/metrics/exemplar/reservoir.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/meter_context.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"
Expand Down Expand Up @@ -53,8 +54,8 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation)
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kSum,
nullptr);
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kSum, ExemplarReservoir::GetNoExemplarReservoir(), nullptr);
int64_t get_count1 = 20;
int64_t put_count1 = 10;
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
Expand Down Expand Up @@ -144,8 +145,8 @@ TEST_P(WritableMetricStorageTestUpDownFixture, TestAggregation)
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kDefault,
nullptr);
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kDefault, ExemplarReservoir::GetNoExemplarReservoir(), nullptr);
int64_t get_count1 = 20;
int64_t put_count1 = 10;
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
Expand Down Expand Up @@ -234,8 +235,9 @@ TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation)
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kLastValue,
nullptr);
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kLastValue, ExemplarReservoir::GetNoExemplarReservoir(),
nullptr);
int64_t freq_cpu0 = 3;
int64_t freq_cpu1 = 5;
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
Expand Down

0 comments on commit 13f01ca

Please sign in to comment.