Skip to content

Commit

Permalink
Fix timestamp handling for the lastvalue aggregation (#5517)
Browse files Browse the repository at this point in the history
Fixes #5510

From https://opentelemetry.io/docs/specs/otel/metrics/sdk/#metricreader:

> The ending timestamp (i.e. TimeUnixNano) MUST always be equal to time
the metric data point took effect, which is equal to when
[MetricReader.Collect](https://opentelemetry.io/docs/specs/otel/metrics/sdk/#collect)
was invoked. These rules apply to all metrics, not just those whose
[point
kinds](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#point-kinds)
includes an aggregation temporality field.

Before #5305, we
used the measurement time as the timestamp, but it didn't matter because
the collection time is always the same as the measurement time for
asynchronous instruments. We didn't catch the issue when we implemented
synchronous instruments.

This PR changes the (end) timestamp handling for the Last Value
aggregation to match the (end) timestamp handling for sum and histogram
aggregations. As described in the spec above, we take the timestamp when
computing the aggregation during Collect.
  • Loading branch information
dashpole authored Jun 19, 2024
1 parent ffe855d commit 773aec2
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Fix counting number of dropped attributes of `Record` in `go.opentelemetry.io/otel/sdk/log`. (#5464)
- Fix panic in baggage creation when a member contains 0x80 char in key or value. (#5494)
- Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their coresponding environment variables in in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#5508)
- Fix stale timestamps reported by the lastvalue aggregation. (#5517)

## [1.27.0/0.49.0/0.3.0] 2024-05-21

Expand Down
28 changes: 15 additions & 13 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ import (

// datapoint is timestamped measurement data.
type datapoint[N int64 | float64] struct {
attrs attribute.Set
timestamp time.Time
value N
res exemplar.Reservoir
attrs attribute.Set
value N
res exemplar.Reservoir
}

func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *lastValue[N] {
Expand Down Expand Up @@ -53,41 +52,42 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.
}

d.attrs = attr
d.timestamp = t
d.value = value
d.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)

s.values[attr.Equivalent()] = d
}

func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints)
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = now()
s.start = t

*dest = gData

return n
}

func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints)
n := s.copyDpts(&gData.DataPoints, t)
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
Expand All @@ -99,15 +99,15 @@ func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int {

// copyDpts copies the datapoints held by s into dest. The number of datapoints
// copied is returned.
func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N]) int {
func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int {
n := len(s.values)
*dest = reset(*dest, n, n)

var i int
for _, v := range s.values {
(*dest)[i].Attributes = v.attrs
(*dest)[i].StartTime = s.start
(*dest)[i].Time = v.timestamp
(*dest)[i].Time = t
(*dest)[i].Value = v.value
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
i++
Expand All @@ -127,33 +127,35 @@ type precomputedLastValue[N int64 | float64] struct {
}

func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints)
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = now()
s.start = t

*dest = gData

return n
}

func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints)
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
*dest = gData
Expand Down
60 changes: 30 additions & 30 deletions sdk/metric/internal/aggregate/lastvalue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(1),
Time: y2kPlus(5),
Time: y2kPlus(7),
Value: 2,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(1),
Time: y2kPlus(6),
Time: y2kPlus(7),
Value: -10,
},
},
Expand All @@ -89,13 +89,13 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(8),
Time: y2kPlus(9),
Time: y2kPlus(11),
Value: 10,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(8),
Time: y2kPlus(10),
Time: y2kPlus(11),
Value: 3,
},
},
Expand All @@ -116,19 +116,19 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(11),
Time: y2kPlus(12),
Time: y2kPlus(16),
Value: 1,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(11),
Time: y2kPlus(13),
Time: y2kPlus(16),
Value: 1,
},
{
Attributes: overflowSet,
StartTime: y2kPlus(11),
Time: y2kPlus(15),
Time: y2kPlus(16),
Value: 1,
},
},
Expand Down Expand Up @@ -165,13 +165,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(4),
Time: y2kPlus(7),
Value: 2,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(5),
Time: y2kPlus(7),
Value: -10,
},
},
Expand All @@ -187,13 +187,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(4),
Time: y2kPlus(8),
Value: 2,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(5),
Time: y2kPlus(8),
Value: -10,
},
},
Expand All @@ -211,13 +211,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(6),
Time: y2kPlus(11),
Value: 10,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(7),
Time: y2kPlus(11),
Value: 3,
},
},
Expand All @@ -238,19 +238,19 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(8),
Time: y2kPlus(16),
Value: 1,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(9),
Time: y2kPlus(16),
Value: 1,
},
{
Attributes: overflowSet,
StartTime: y2kPlus(0),
Time: y2kPlus(11),
Time: y2kPlus(16),
Value: 1,
},
},
Expand Down Expand Up @@ -287,13 +287,13 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(1),
Time: y2kPlus(5),
Time: y2kPlus(7),
Value: 2,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(1),
Time: y2kPlus(6),
Time: y2kPlus(7),
Value: -10,
},
},
Expand All @@ -315,13 +315,13 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(8),
Time: y2kPlus(9),
Time: y2kPlus(11),
Value: 10,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(8),
Time: y2kPlus(10),
Time: y2kPlus(11),
Value: 3,
},
},
Expand All @@ -342,19 +342,19 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(11),
Time: y2kPlus(12),
Time: y2kPlus(16),
Value: 1,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(11),
Time: y2kPlus(13),
Time: y2kPlus(16),
Value: 1,
},
{
Attributes: overflowSet,
StartTime: y2kPlus(11),
Time: y2kPlus(15),
Time: y2kPlus(16),
Value: 1,
},
},
Expand Down Expand Up @@ -391,13 +391,13 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(4),
Time: y2kPlus(7),
Value: 2,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(5),
Time: y2kPlus(7),
Value: -10,
},
},
Expand All @@ -419,13 +419,13 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(6),
Time: y2kPlus(11),
Value: 10,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(7),
Time: y2kPlus(11),
Value: 3,
},
},
Expand All @@ -446,19 +446,19 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(8),
Time: y2kPlus(16),
Value: 1,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(9),
Time: y2kPlus(16),
Value: 1,
},
{
Attributes: overflowSet,
StartTime: y2kPlus(0),
Time: y2kPlus(11),
Time: y2kPlus(16),
Value: 1,
},
},
Expand Down

0 comments on commit 773aec2

Please sign in to comment.