Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1588 - Observable Gauge does not reflect updated values, and send the old value always #1641

Merged
merged 6 commits into from
Oct 2, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions examples/common/metrics_foo_library/foo_library.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ class MeasurementFetcher
if (nostd::holds_alternative<
nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<double>>>(observer_result))
{
double val = (rand() % 700) + 1.1;
double random_incr = (rand() % 5) + 1.1;
value_ += random_incr;
nostd::get<nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<double>>>(
observer_result)
->Observe(val /*, labelkv */);
->Observe(value_ /*, labelkv */);
}
}
static double value_;
};
double MeasurementFetcher::value_ = 0.0;
} // namespace

void foo_library::counter_example(const std::string &name)
Expand Down
2 changes: 1 addition & 1 deletion examples/metrics_simple/metrics_ostream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void initMetrics(const std::string &name)
std::unique_ptr<metric_sdk::MeterSelector> observable_meter_selector{
new metric_sdk::MeterSelector(name, version, schema)};
std::unique_ptr<metric_sdk::View> observable_sum_view{
new metric_sdk::View{name, "description", metric_sdk::AggregationType::kSum}};
new metric_sdk::View{name, "test_description", metric_sdk::AggregationType::kSum}};
p->AddView(std::move(observable_instrument_selector), std::move(observable_meter_selector),
std::move(observable_sum_view));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
void Record(const std::unordered_map<MetricAttributes, T, AttributeHashGenerator> &measurements,
opentelemetry::common::SystemTimestamp /* observation_time */) noexcept
{
// process the read measurements - aggregate and store in hashmap
// Async counter always record monotonically increasing values, and the
// exporter/reader can request either for delta or cumulative value.
// So we convert the async counter value to delta before passing it to temporal storage.
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(hashmap_lock_);
for (auto &measurement : measurements)
{
Expand All @@ -53,13 +55,14 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
if (prev)
{
auto delta = prev->Diff(*aggr);
cumulative_hash_map_->Set(measurement.first,
DefaultAggregation::CloneAggregation(
aggregation_type_, instrument_descriptor_, *delta));
// store received value in cumulative map, and the diff in delta map (to pass it to temporal
// storage)
cumulative_hash_map_->Set(measurement.first, std::move(aggr));
delta_hash_map_->Set(measurement.first, std::move(delta));
}
else
{
// store received value in cumulative and delta map.
cumulative_hash_map_->Set(
measurement.first,
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr));
Expand Down
1 change: 0 additions & 1 deletion sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
std::vector<MetricData> Meter::Collect(CollectorHandle *collector,
opentelemetry::common::SystemTimestamp collect_ts) noexcept
{

observable_registry_->Observe(collect_ts);
std::vector<MetricData> metric_data_list;
auto ctx = meter_context_.lock();
Expand Down
1 change: 1 addition & 0 deletions sdk/src/metrics/state/temporal_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts;
AggregationTemporality aggregation_temporarily =
collector->GetAggregationTemporality(instrument_descriptor_.type_);

if (delta_metrics->Size())
{
for (auto &col : collectors)
Expand Down
58 changes: 48 additions & 10 deletions sdk/test/metrics/async_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation)
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kSum, default_attributes_processor.get(),
std::shared_ptr<opentelemetry::sdk::metrics::AggregationConfig>{});
long get_count = 20l;
long put_count = 10l;
size_t attribute_count = 2;
std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements = {
{{{"RequestType", "GET"}}, get_count}, {{{"RequestType", "PUT"}}, put_count}};
storage.RecordLong(measurements,
long get_count1 = 20l;
long put_count1 = 10l;
size_t attribute_count = 2;
std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements1 = {
{{{"RequestType", "GET"}}, get_count1}, {{{"RequestType", "PUT"}}, put_count1}};
storage.RecordLong(measurements1,
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));

storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts,
Expand All @@ -123,20 +123,58 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation)
if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "GET")
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), get_count);
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), get_count1);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "PUT")
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), put_count);
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), put_count1);
}
}
return true;
});
// subsequent recording after collection shouldn't fail
storage.RecordLong(measurements,
// monotonic increasing values;
long get_count2 = 50l;
long put_count2 = 70l;

std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements2 = {
{{{"RequestType", "GET"}}, get_count2}, {{{"RequestType", "PUT"}}, put_count2}};
storage.RecordLong(measurements2,
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
EXPECT_EQ(MeasurementFetcher::number_of_attributes, attribute_count);
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
for (auto data_attr : data.point_data_attr_)
{
auto data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "GET")
{
if (temporality == AggregationTemporality::kCumulative)
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), get_count2);
}
else
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), get_count2 - get_count1);
}
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "PUT")
{
if (temporality == AggregationTemporality::kCumulative)
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), put_count2);
}
else
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_),
put_count2 - put_count1); // 50 - 30
lalitb marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return true;
});
}

INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong,
Expand Down