From 773aec217df3f8b153a3d5a1584638e4965f122a Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 19 Jun 2024 06:13:04 -0400 Subject: [PATCH] Fix timestamp handling for the lastvalue aggregation (#5517) Fixes https://github.com/open-telemetry/opentelemetry-go/issues/5510 From https://opentelemetry.io/docs/specs/otel/metrics/sdk/#metricreader: > The ending timestamp (i.e. TimeUnixNano) MUST always be equal to time the metric data point took effect, which is equal to when [MetricReader.Collect](https://opentelemetry.io/docs/specs/otel/metrics/sdk/#collect) was invoked. These rules apply to all metrics, not just those whose [point kinds](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#point-kinds) includes an aggregation temporality field. Before https://github.com/open-telemetry/opentelemetry-go/pull/5305, we used the measurement time as the timestamp, but it didn't matter because the collection time is always the same as the measurement time for asynchronous instruments. We didn't catch the issue when we implemented synchronous instruments. This PR changes the (end) timestamp handling for the Last Value aggregation to match the (end) timestamp handling for sum and histogram aggregations. As described in the spec above, we take the timestamp when computing the aggregation during Collect. --- CHANGELOG.md | 1 + sdk/metric/internal/aggregate/lastvalue.go | 28 +++++---- .../internal/aggregate/lastvalue_test.go | 60 +++++++++---------- 3 files changed, 46 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb2253d2f8a..204226a4ac1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Fix counting number of dropped attributes of `Record` in `go.opentelemetry.io/otel/sdk/log`. (#5464) - Fix panic in baggage creation when a member contains 0x80 char in key or value. (#5494) - Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their coresponding environment variables in in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#5508) +- Fix stale timestamps reported by the lastvalue aggregation. (#5517) ## [1.27.0/0.49.0/0.3.0] 2024-05-21 diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 8f406dd2bcb..3b65e761e86 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -15,10 +15,9 @@ import ( // datapoint is timestamped measurement data. type datapoint[N int64 | float64] struct { - attrs attribute.Set - timestamp time.Time - value N - res exemplar.Reservoir + attrs attribute.Set + value N + res exemplar.Reservoir } func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *lastValue[N] { @@ -53,7 +52,6 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. } d.attrs = attr - d.timestamp = t d.value = value d.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr) @@ -61,6 +59,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. } func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int { + t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) @@ -68,11 +67,11 @@ func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int { s.Lock() defer s.Unlock() - n := s.copyDpts(&gData.DataPoints) + n := s.copyDpts(&gData.DataPoints, t) // Do not report stale values. clear(s.values) // Update start time for delta temporality. - s.start = now() + s.start = t *dest = gData @@ -80,6 +79,7 @@ func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int { } func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) @@ -87,7 +87,7 @@ func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int { s.Lock() defer s.Unlock() - n := s.copyDpts(&gData.DataPoints) + n := s.copyDpts(&gData.DataPoints, t) // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -99,7 +99,7 @@ func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int { // copyDpts copies the datapoints held by s into dest. The number of datapoints // copied is returned. -func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N]) int { +func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int { n := len(s.values) *dest = reset(*dest, n, n) @@ -107,7 +107,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N]) int { for _, v := range s.values { (*dest)[i].Attributes = v.attrs (*dest)[i].StartTime = s.start - (*dest)[i].Time = v.timestamp + (*dest)[i].Time = t (*dest)[i].Value = v.value collectExemplars(&(*dest)[i].Exemplars, v.res.Collect) i++ @@ -127,6 +127,7 @@ type precomputedLastValue[N int64 | float64] struct { } func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int { + t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) @@ -134,11 +135,11 @@ func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int { s.Lock() defer s.Unlock() - n := s.copyDpts(&gData.DataPoints) + n := s.copyDpts(&gData.DataPoints, t) // Do not report stale values. clear(s.values) // Update start time for delta temporality. - s.start = now() + s.start = t *dest = gData @@ -146,6 +147,7 @@ func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int { } func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) @@ -153,7 +155,7 @@ func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int { s.Lock() defer s.Unlock() - n := s.copyDpts(&gData.DataPoints) + n := s.copyDpts(&gData.DataPoints, t) // Do not report stale values. clear(s.values) *dest = gData diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index 8504e3b192e..1e4ca21c96a 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -61,13 +61,13 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(1), - Time: y2kPlus(5), + Time: y2kPlus(7), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(1), - Time: y2kPlus(6), + Time: y2kPlus(7), Value: -10, }, }, @@ -89,13 +89,13 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(8), - Time: y2kPlus(9), + Time: y2kPlus(11), Value: 10, }, { Attributes: fltrBob, StartTime: y2kPlus(8), - Time: y2kPlus(10), + Time: y2kPlus(11), Value: 3, }, }, @@ -116,19 +116,19 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(11), - Time: y2kPlus(12), + Time: y2kPlus(16), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(11), - Time: y2kPlus(13), + Time: y2kPlus(16), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(11), - Time: y2kPlus(15), + Time: y2kPlus(16), Value: 1, }, }, @@ -165,13 +165,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(4), + Time: y2kPlus(7), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(5), + Time: y2kPlus(7), Value: -10, }, }, @@ -187,13 +187,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(4), + Time: y2kPlus(8), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(5), + Time: y2kPlus(8), Value: -10, }, }, @@ -211,13 +211,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(6), + Time: y2kPlus(11), Value: 10, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(7), + Time: y2kPlus(11), Value: 3, }, }, @@ -238,19 +238,19 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(8), + Time: y2kPlus(16), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(9), + Time: y2kPlus(16), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(0), - Time: y2kPlus(11), + Time: y2kPlus(16), Value: 1, }, }, @@ -287,13 +287,13 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(1), - Time: y2kPlus(5), + Time: y2kPlus(7), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(1), - Time: y2kPlus(6), + Time: y2kPlus(7), Value: -10, }, }, @@ -315,13 +315,13 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(8), - Time: y2kPlus(9), + Time: y2kPlus(11), Value: 10, }, { Attributes: fltrBob, StartTime: y2kPlus(8), - Time: y2kPlus(10), + Time: y2kPlus(11), Value: 3, }, }, @@ -342,19 +342,19 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(11), - Time: y2kPlus(12), + Time: y2kPlus(16), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(11), - Time: y2kPlus(13), + Time: y2kPlus(16), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(11), - Time: y2kPlus(15), + Time: y2kPlus(16), Value: 1, }, }, @@ -391,13 +391,13 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(4), + Time: y2kPlus(7), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(5), + Time: y2kPlus(7), Value: -10, }, }, @@ -419,13 +419,13 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(6), + Time: y2kPlus(11), Value: 10, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(7), + Time: y2kPlus(11), Value: 3, }, }, @@ -446,19 +446,19 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(8), + Time: y2kPlus(16), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(9), + Time: y2kPlus(16), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(0), - Time: y2kPlus(11), + Time: y2kPlus(16), Value: 1, }, },