Skip to content

Commit

Permalink
Fix Observable Counters/UpDownCounters (open-telemetry#2298)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Sep 18, 2023
1 parent 049f7e8 commit 0563a71
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,16 @@ class DefaultAggregation
const opentelemetry::sdk::metrics::InstrumentDescriptor &instrument_descriptor,
const AggregationConfig *aggregation_config)
{
switch (instrument_descriptor.type_)
bool is_monotonic = true;
auto aggr_type = GetDefaultAggregationType(instrument_descriptor.type_, is_monotonic);
switch (aggr_type)
{
case InstrumentType::kCounter:
case InstrumentType::kObservableCounter:
case AggregationType::kSum:
return (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
? std::move(std::unique_ptr<Aggregation>(new LongSumAggregation(true)))
? std::move(std::unique_ptr<Aggregation>(new LongSumAggregation(is_monotonic)))
: std::move(std::unique_ptr<Aggregation>(new DoubleSumAggregation(true)));
case InstrumentType::kUpDownCounter:
case InstrumentType::kObservableUpDownCounter:
return (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
? std::move(std::unique_ptr<Aggregation>(new LongSumAggregation(false)))
: std::move(std::unique_ptr<Aggregation>(new DoubleSumAggregation(false)));
break;
case InstrumentType::kHistogram: {
case AggregationType::kHistogram: {
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
{
return (std::unique_ptr<Aggregation>(new LongHistogramAggregation(aggregation_config)));
Expand All @@ -53,7 +49,7 @@ class DefaultAggregation

break;
}
case InstrumentType::kObservableGauge:
case AggregationType::kLastValue:
return (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
? std::move(std::unique_ptr<Aggregation>(new LongLastValueAggregation()))
: std::move(std::unique_ptr<Aggregation>(new DoubleLastValueAggregation()));
Expand Down Expand Up @@ -121,6 +117,11 @@ class DefaultAggregation
const Aggregation &to_copy)
{
const PointType point_data = to_copy.ToPoint();
bool is_monotonic = true;
if (aggregation_type == AggregationType::kDefault)
{
aggregation_type = GetDefaultAggregationType(instrument_descriptor.type_, is_monotonic);
}
switch (aggregation_type)
{
case AggregationType::kDrop:
Expand Down Expand Up @@ -159,7 +160,29 @@ class DefaultAggregation
new DoubleSumAggregation(nostd::get<SumPointData>(point_data)));
}
default:
return DefaultAggregation::CreateAggregation(instrument_descriptor, nullptr);
return nullptr; // won't reach here
}
}

static AggregationType GetDefaultAggregationType(InstrumentType instrument_type,
bool &is_monotonic)
{
is_monotonic = false;
switch (instrument_type)
{
case InstrumentType::kCounter:
case InstrumentType::kObservableCounter:
is_monotonic = true;
return AggregationType::kSum;
case InstrumentType::kUpDownCounter:
case InstrumentType::kObservableUpDownCounter:
return AggregationType::kSum;
case InstrumentType::kHistogram:
return AggregationType::kHistogram;
case InstrumentType::kObservableGauge:
return AggregationType::kLastValue;
default:
return AggregationType::kDrop;
}
}
};
Expand Down
95 changes: 95 additions & 0 deletions sdk/test/metrics/async_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ using M = std::map<std::string, std::string>;
class WritableMetricStorageTestFixture : public ::testing::TestWithParam<AggregationTemporality>
{};

class WritableMetricStorageTestUpDownFixture
: public ::testing::TestWithParam<AggregationTemporality>
{};

class WritableMetricStorageTestObservableGaugeFixture
: public ::testing::TestWithParam<AggregationTemporality>
{};
Expand Down Expand Up @@ -124,6 +128,97 @@ INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong,
::testing::Values(AggregationTemporality::kCumulative,
AggregationTemporality::kDelta));

TEST_P(WritableMetricStorageTestUpDownFixture, TestAggregation)
{
AggregationTemporality temporality = GetParam();

InstrumentDescriptor instr_desc = {"name", "desc", "1unit",
InstrumentType::kObservableUpDownCounter,
InstrumentValueType::kLong};

auto sdk_start_ts = std::chrono::system_clock::now();
// Some computation here
auto collection_ts = std::chrono::system_clock::now() + std::chrono::seconds(5);

std::shared_ptr<CollectorHandle> collector(new MockCollectorHandle(temporality));
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kDefault,
nullptr);
int64_t get_count1 = 20;
int64_t put_count1 = 10;
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
{{{"RequestType", "GET"}}, get_count1}, {{{"RequestType", "PUT"}}, put_count1}};
storage.RecordLong(measurements1,
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));

storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
for (const auto &data_attr : metric_data.point_data_attr_)
{
const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "GET")
{
EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), get_count1);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "PUT")
{
EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), put_count1);
}
}
return true;
});
// subsequent recording after collection shouldn't fail
// monotonic increasing values;
int64_t get_count2 = -50;
int64_t put_count2 = -70;

std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements2 = {
{{{"RequestType", "GET"}}, get_count2}, {{{"RequestType", "PUT"}}, put_count2}};
storage.RecordLong(measurements2,
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
for (const auto &data_attr : metric_data.point_data_attr_)
{
const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "GET")
{
if (temporality == AggregationTemporality::kCumulative)
{
EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), get_count2);
}
else
{
EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), get_count2 - get_count1);
}
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "PUT")
{
if (temporality == AggregationTemporality::kCumulative)
{
EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), put_count2);
}
else
{
EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), put_count2 - put_count1);
}
}
}
return true;
});
}

INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestUpDownLong,
WritableMetricStorageTestUpDownFixture,
::testing::Values(AggregationTemporality::kCumulative,
AggregationTemporality::kDelta));

TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation)
{
AggregationTemporality temporality = GetParam();
Expand Down

0 comments on commit 0563a71

Please sign in to comment.