From f21b3a009a621b0ca8721cda1f52dfbfa7016e78 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 2 Oct 2022 14:38:50 -0700 Subject: [PATCH] Fix #1588 - Observable Gauge does not reflect updated values, and send the old value always (#1641) --- .../common/metrics_foo_library/foo_library.cc | 7 ++- examples/metrics_simple/metrics_ostream.cc | 2 +- .../sdk/metrics/state/async_metric_storage.h | 11 ++-- sdk/src/metrics/meter.cc | 1 - .../metrics/state/temporal_metric_storage.cc | 1 + sdk/test/metrics/async_metric_storage_test.cc | 57 +++++++++++++++---- 6 files changed, 61 insertions(+), 18 deletions(-) diff --git a/examples/common/metrics_foo_library/foo_library.cc b/examples/common/metrics_foo_library/foo_library.cc index ec4fec736d..5fd32524d2 100644 --- a/examples/common/metrics_foo_library/foo_library.cc +++ b/examples/common/metrics_foo_library/foo_library.cc @@ -39,13 +39,16 @@ class MeasurementFetcher if (nostd::holds_alternative< nostd::shared_ptr>>(observer_result)) { - double val = (rand() % 700) + 1.1; + double random_incr = (rand() % 5) + 1.1; + value_ += random_incr; nostd::get>>( observer_result) - ->Observe(val /*, labelkv */); + ->Observe(value_ /*, labelkv */); } } + static double value_; }; +double MeasurementFetcher::value_ = 0.0; } // namespace void foo_library::counter_example(const std::string &name) diff --git a/examples/metrics_simple/metrics_ostream.cc b/examples/metrics_simple/metrics_ostream.cc index 669362830b..9ee1dc1c25 100644 --- a/examples/metrics_simple/metrics_ostream.cc +++ b/examples/metrics_simple/metrics_ostream.cc @@ -62,7 +62,7 @@ void initMetrics(const std::string &name) std::unique_ptr observable_meter_selector{ new metric_sdk::MeterSelector(name, version, schema)}; std::unique_ptr observable_sum_view{ - new metric_sdk::View{name, "description", metric_sdk::AggregationType::kSum}}; + new metric_sdk::View{name, "test_description", metric_sdk::AggregationType::kSum}}; p->AddView(std::move(observable_instrument_selector), std::move(observable_meter_selector), std::move(observable_sum_view)); diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index 79731a80bc..db9c61741d 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -43,7 +43,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora void Record(const std::unordered_map &measurements, opentelemetry::common::SystemTimestamp /* observation_time */) noexcept { - // process the read measurements - aggregate and store in hashmap + // Async counter always record monotonically increasing values, and the + // exporter/reader can request either for delta or cumulative value. + // So we convert the async counter value to delta before passing it to temporal storage. std::lock_guard guard(hashmap_lock_); for (auto &measurement : measurements) { @@ -53,13 +55,14 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora if (prev) { auto delta = prev->Diff(*aggr); - cumulative_hash_map_->Set(measurement.first, - DefaultAggregation::CloneAggregation( - aggregation_type_, instrument_descriptor_, *delta)); + // store received value in cumulative map, and the diff in delta map (to pass it to temporal + // storage) + cumulative_hash_map_->Set(measurement.first, std::move(aggr)); delta_hash_map_->Set(measurement.first, std::move(delta)); } else { + // store received value in cumulative and delta map. cumulative_hash_map_->Set( measurement.first, DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr)); diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index 62f6eaf313..4d6595dd7f 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -293,7 +293,6 @@ std::unique_ptr Meter::RegisterAsyncMetricStorage( std::vector Meter::Collect(CollectorHandle *collector, opentelemetry::common::SystemTimestamp collect_ts) noexcept { - observable_registry_->Observe(collect_ts); std::vector metric_data_list; auto ctx = meter_context_.lock(); diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc index 3c8f80695e..b8fd38d794 100644 --- a/sdk/src/metrics/state/temporal_metric_storage.cc +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -34,6 +34,7 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts; AggregationTemporality aggregation_temporarily = collector->GetAggregationTemporality(instrument_descriptor_.type_); + if (delta_metrics->Size()) { for (auto &col : collectors) diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index 02d2734ecf..681547f171 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -107,12 +107,12 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) opentelemetry::sdk::metrics::AsyncMetricStorage storage( instr_desc, AggregationType::kSum, default_attributes_processor.get(), std::shared_ptr{}); - long get_count = 20l; - long put_count = 10l; - size_t attribute_count = 2; - std::unordered_map measurements = { - {{{"RequestType", "GET"}}, get_count}, {{{"RequestType", "PUT"}}, put_count}}; - storage.RecordLong(measurements, + long get_count1 = 20l; + long put_count1 = 10l; + size_t attribute_count = 2; + std::unordered_map measurements1 = { + {{{"RequestType", "GET"}}, get_count1}, {{{"RequestType", "PUT"}}, put_count1}}; + storage.RecordLong(measurements1, opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now())); storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, @@ -123,20 +123,57 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) if (opentelemetry::nostd::get( data_attr.attributes.find("RequestType")->second) == "GET") { - EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count); + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count1); } else if (opentelemetry::nostd::get( data_attr.attributes.find("RequestType")->second) == "PUT") { - EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count); + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count1); } } return true; }); // subsequent recording after collection shouldn't fail - storage.RecordLong(measurements, + // monotonic increasing values; + long get_count2 = 50l; + long put_count2 = 70l; + + std::unordered_map measurements2 = { + {{{"RequestType", "GET"}}, get_count2}, {{{"RequestType", "PUT"}}, put_count2}}; + storage.RecordLong(measurements2, opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now())); - EXPECT_EQ(MeasurementFetcher::number_of_attributes, attribute_count); + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + if (temporality == AggregationTemporality::kCumulative) + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count2); + } + else + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count2 - get_count1); + } + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + if (temporality == AggregationTemporality::kCumulative) + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count2); + } + else + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count2 - put_count1); + } + } + } + return true; + }); } INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong,