Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom Aggregation support #1899

Merged
merged 27 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ class DefaultAggregation
case AggregationType::kSum: {
bool is_monotonic = true;
if (instrument_descriptor.type_ == InstrumentType::kUpDownCounter ||
instrument_descriptor.type_ == InstrumentType::kObservableUpDownCounter)
instrument_descriptor.type_ == InstrumentType::kObservableUpDownCounter ||
instrument_descriptor.type_ == InstrumentType::kHistogram)
{
is_monotonic = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,12 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
public:
AsyncMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
const AttributesProcessor *attributes_processor,
const AggregationConfig *aggregation_config,
void *state = nullptr)
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
attributes_processor_{attributes_processor},
state_{state},
cumulative_hash_map_(new AttributesHashMap()),
delta_hash_map_(new AttributesHashMap()),
temporal_metric_storage_(instrument_descriptor, aggregation_config)
temporal_metric_storage_(instrument_descriptor, aggregation_type, aggregation_config)
{}

template <class T>
Expand Down Expand Up @@ -116,8 +112,6 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
private:
InstrumentDescriptor instrument_descriptor_;
AggregationType aggregation_type_;
const AttributesProcessor *attributes_processor_;
void *state_;
std::unique_ptr<AttributesHashMap> cumulative_hash_map_;
std::unique_ptr<AttributesHashMap> delta_hash_map_;
opentelemetry::common::SpinLockMutex hashmap_lock_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
nostd::shared_ptr<ExemplarReservoir> &&exemplar_reservoir,
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
attributes_hashmap_(new AttributesHashMap()),
attributes_processor_{attributes_processor},
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_(exemplar_reservoir),
#endif
temporal_metric_storage_(instrument_descriptor, aggregation_config)
temporal_metric_storage_(instrument_descriptor, aggregation_type, aggregation_config)

{
create_default_aggregation_ = [&, aggregation_config]() -> std::unique_ptr<Aggregation> {
return DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_,
create_default_aggregation_ = [&, aggregation_type,
aggregation_config]() -> std::unique_ptr<Aggregation> {
return DefaultAggregation::CreateAggregation(aggregation_type, instrument_descriptor_,
aggregation_config);
};
}
Expand Down Expand Up @@ -120,7 +120,6 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage

private:
InstrumentDescriptor instrument_descriptor_;
AggregationType aggregation_type_;

// hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call)
std::unique_ptr<AttributesHashMap> attributes_hashmap_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TemporalMetricStorage
{
public:
TemporalMetricStorage(InstrumentDescriptor instrument_descriptor,
AggregationType aggregation_type,
const AggregationConfig *aggregation_config);

bool buildMetrics(CollectorHandle *collector,
Expand All @@ -38,6 +39,7 @@ class TemporalMetricStorage

private:
InstrumentDescriptor instrument_descriptor_;
AggregationType aggregation_type_;

// unreported metrics stash for all the collectors
std::unordered_map<CollectorHandle *, std::list<std::shared_ptr<AttributesHashMap>>>
Expand Down
5 changes: 2 additions & 3 deletions sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,8 @@ std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
{
view_instr_desc.description_ = view.GetDescription();
}
auto storage = std::shared_ptr<AsyncMetricStorage>(
new AsyncMetricStorage(view_instr_desc, view.GetAggregationType(),
&view.GetAttributesProcessor(), view.GetAggregationConfig()));
auto storage = std::shared_ptr<AsyncMetricStorage>(new AsyncMetricStorage(
view_instr_desc, view.GetAggregationType(), view.GetAggregationConfig()));
storage_registry_[instrument_descriptor.name_] = storage;
static_cast<AsyncMultiMetricStorage *>(storages.get())->AddStorage(storage);
return true;
Expand Down
42 changes: 23 additions & 19 deletions sdk/src/metrics/state/temporal_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ namespace metrics
{

TemporalMetricStorage::TemporalMetricStorage(InstrumentDescriptor instrument_descriptor,
AggregationType aggregation_type,
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor), aggregation_config_(aggregation_config)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_(aggregation_type),
aggregation_config_(aggregation_config)
{}

bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
Expand Down Expand Up @@ -64,9 +67,10 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
}
else
{
merged_metrics->Set(attributes, DefaultAggregation::CreateAggregation(
instrument_descriptor_, aggregation_config_)
->Merge(aggregation));
merged_metrics->Set(attributes,
DefaultAggregation::CreateAggregation(
aggregation_type_, instrument_descriptor_, aggregation_config_)
->Merge(aggregation));
}
return true;
});
Expand All @@ -88,21 +92,21 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
if (aggregation_temporarily == AggregationTemporality::kCumulative)
{
// merge current delta to previous cumulative
last_aggr_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes,
Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
auto def_agg =
DefaultAggregation::CreateAggregation(instrument_descriptor_, aggregation_config_);
merged_metrics->Set(attributes, def_agg->Merge(aggregation));
}
return true;
});
last_aggr_hashmap->GetAllEnteries(
[&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
auto def_agg = DefaultAggregation::CreateAggregation(
aggregation_type_, instrument_descriptor_, aggregation_config_);
merged_metrics->Set(attributes, def_agg->Merge(aggregation));
}
return true;
});
}
last_reported_metrics_[collector] =
LastReportedMetrics{std::move(merged_metrics), collection_ts};
Expand Down
2 changes: 2 additions & 0 deletions sdk/test/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ foreach(
meter_test
view_registry_test
aggregation_test
sum_aggregation_test
histogram_aggregation_test
attributes_processor_test
attributes_hashmap_test
histogram_test
Expand Down
12 changes: 4 additions & 8 deletions sdk/test/metrics/async_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation)
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

std::unique_ptr<AttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kSum, default_attributes_processor.get(), nullptr);
opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kSum,
nullptr);
int64_t get_count1 = 20;
int64_t put_count1 = 10;
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
Expand Down Expand Up @@ -157,10 +155,8 @@ TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation)
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

std::unique_ptr<AttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kLastValue, default_attributes_processor.get(), nullptr);
opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kLastValue,
nullptr);
int64_t freq_cpu0 = 3;
int64_t freq_cpu1 = 5;
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
Expand Down
158 changes: 158 additions & 0 deletions sdk/test/metrics/histogram_aggregation_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include "opentelemetry/common/macros.h"
#include "opentelemetry/sdk/metrics/data/point_data.h"
#include "opentelemetry/sdk/metrics/meter.h"
#include "opentelemetry/sdk/metrics/meter_context.h"
#include "opentelemetry/sdk/metrics/meter_provider.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"
#include "opentelemetry/sdk/metrics/push_metric_exporter.h"

#include <gtest/gtest.h>

using namespace opentelemetry;
using namespace opentelemetry::sdk::instrumentationscope;
using namespace opentelemetry::sdk::metrics;

class MockMetricExporter : public PushMetricExporter
{
public:
MockMetricExporter() = default;
opentelemetry::sdk::common::ExportResult Export(
const ResourceMetrics & /* records */) noexcept override
{
return opentelemetry::sdk::common::ExportResult::kSuccess;
}

AggregationTemporality GetAggregationTemporality(
InstrumentType /* instrument_type */) const noexcept override
{
return AggregationTemporality::kCumulative;
}

bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return true; }

bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { return true; }
};

class MockMetricReader : public MetricReader
{
public:
MockMetricReader(std::unique_ptr<PushMetricExporter> exporter) : exporter_(std::move(exporter)) {}
AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) const noexcept override
{
return exporter_->GetAggregationTemporality(instrument_type);
}
virtual bool OnForceFlush(std::chrono::microseconds /* timeout */) noexcept override
{
return true;
}
virtual bool OnShutDown(std::chrono::microseconds /* timeout */) noexcept override
{
return true;
}
virtual void OnInitialized() noexcept override {}

private:
std::unique_ptr<PushMetricExporter> exporter_;
};

TEST(HistogramInstrumentToHistogramAggregation, Double)
{
#if OPENTELEMETRY_HAVE_WORKING_REGEX
lalitb marked this conversation as resolved.
Show resolved Hide resolved
MeterProvider mp;
auto m = mp.GetMeter("meter1", "version1", "schema1");

std::unique_ptr<MockMetricExporter> exporter(new MockMetricExporter());
std::shared_ptr<MetricReader> reader{new MockMetricReader(std::move(exporter))};
mp.AddMetricReader(reader);

auto h = m->CreateDoubleHistogram("histogram1", "histogram1_description", "histogram1_unit");

h->Record(5, {});
h->Record(10, {});
h->Record(15, {});
h->Record(20, {});
h->Record(25, {});
h->Record(30, {});
h->Record(35, {});
h->Record(40, {});
h->Record(45, {});
h->Record(50, {});
h->Record(1e6, {});

std::vector<HistogramPointData> actuals;
reader->Collect([&](ResourceMetrics &rm) {
for (const ScopeMetrics &smd : rm.scope_metric_data_)
{
for (const MetricData &md : smd.metric_data_)
{
for (const PointDataAttributes &dp : md.point_data_attr_)
{
actuals.push_back(opentelemetry::nostd::get<HistogramPointData>(dp.point_data));
}
}
}
return true;
});

ASSERT_EQ(1, actuals.size());
const auto &actual = actuals.at(0);
ASSERT_EQ(1000275.0, opentelemetry::nostd::get<double>(actual.sum_));
ASSERT_EQ(11, actual.count_);
#endif
}

TEST(CounterToHistogram, Double)
{
#if OPENTELEMETRY_HAVE_WORKING_REGEX
MeterProvider mp;
auto m = mp.GetMeter("meter1", "version1", "schema1");

std::unique_ptr<MockMetricExporter> exporter(new MockMetricExporter());
std::shared_ptr<MetricReader> reader{new MockMetricReader(std::move(exporter))};
mp.AddMetricReader(reader);

std::unique_ptr<View> view{new View("view1", "view1_description", AggregationType::kHistogram)};
std::unique_ptr<InstrumentSelector> instrument_selector{
new InstrumentSelector(InstrumentType::kCounter, "counter1")};
std::unique_ptr<MeterSelector> meter_selector{new MeterSelector("meter1", "version1", "schema1")};
mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view));

auto h = m->CreateDoubleCounter("counter1", "counter1_description", "counter1_unit");

h->Add(5, {});
h->Add(10, {});
h->Add(15, {});
h->Add(20, {});
h->Add(25, {});
h->Add(30, {});
h->Add(35, {});
h->Add(40, {});
h->Add(45, {});
h->Add(50, {});
h->Add(1e6, {});

std::vector<HistogramPointData> actuals;
reader->Collect([&](ResourceMetrics &rm) {
for (const ScopeMetrics &smd : rm.scope_metric_data_)
{
for (const MetricData &md : smd.metric_data_)
{
for (const PointDataAttributes &dp : md.point_data_attr_)
{
actuals.push_back(opentelemetry::nostd::get<HistogramPointData>(dp.point_data));
}
}
}
return true;
});

ASSERT_EQ(1, actuals.size());
const auto &actual = actuals.at(0);
ASSERT_EQ(1000275.0, opentelemetry::nostd::get<double>(actual.sum_));
ASSERT_EQ(11, actual.count_);
#endif
}
Loading