diff --git a/CHANGELOG.md b/CHANGELOG.md index 15731d8c22b..64d34115a39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view. This change is made to ensure compatibility with the OpenTelemetry specification. (#4288) - If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290) +- Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332) ### Fixed diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index f386c337135..2c501f45f13 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -109,13 +109,13 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { // ExplicitBucketHistogram returns a histogram aggregate function input and // output. -func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram) (Measure[N], ComputeAggregation) { +func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram, noSum bool) (Measure[N], ComputeAggregation) { var h aggregator[N] switch b.Temporality { case metricdata.DeltaTemporality: - h = newDeltaHistogram[N](cfg) + h = newDeltaHistogram[N](cfg, noSum) default: - h = newCumulativeHistogram[N](cfg) + h = newCumulativeHistogram[N](cfg, noSum) } return b.input(h), func(dest *metricdata.Aggregation) int { // TODO (#4220): optimize memory reuse here. diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index a7a7780c307..0683ff2eb23 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -27,7 +27,7 @@ import ( type buckets[N int64 | float64] struct { counts []uint64 count uint64 - sum N + total N min, max N } @@ -36,10 +36,11 @@ func newBuckets[N int64 | float64](n int) *buckets[N] { return &buckets[N]{counts: make([]uint64, n)} } +func (b *buckets[N]) sum(value N) { b.total += value } + func (b *buckets[N]) bin(idx int, value N) { b.counts[idx]++ b.count++ - b.sum += value if value < b.min { b.min = value } else if value > b.max { @@ -50,13 +51,14 @@ func (b *buckets[N]) bin(idx int, value N) { // histValues summarizes a set of measurements as an histValues with // explicitly defined buckets. type histValues[N int64 | float64] struct { + noSum bool bounds []float64 values map[attribute.Set]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -65,6 +67,7 @@ func newHistValues[N int64 | float64](bounds []float64) *histValues[N] { copy(b, bounds) sort.Float64s(b) return &histValues[N]{ + noSum: noSum, bounds: b, values: make(map[attribute.Set]*buckets[N]), } @@ -98,6 +101,9 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) { s.values[attr] = b } b.bin(idx, value) + if !s.noSum { + b.sum(value) + } } // newDeltaHistogram returns an Aggregator that summarizes a set of @@ -107,9 +113,9 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) { // Each aggregation cycle is treated independently. When the returned // Aggregator's Aggregations method is called it will reset all histogram // counts to zero. -func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) aggregator[N] { +func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] { return &deltaHistogram[N]{ - histValues: newHistValues[N](cfg.Boundaries), + histValues: newHistValues[N](cfg.Boundaries, noSum), noMinMax: cfg.NoMinMax, start: now(), } @@ -148,7 +154,9 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation { Count: b.count, Bounds: bounds, BucketCounts: b.counts, - Sum: b.sum, + } + if !s.noSum { + hdp.Sum = b.total } if !s.noMinMax { hdp.Min = metricdata.NewExtrema(b.min) @@ -170,9 +178,9 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation { // Each aggregation cycle builds from the previous, the histogram counts are // the bucketed counts of all values aggregated since the returned Aggregator // was created. -func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) aggregator[N] { +func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] { return &cumulativeHistogram[N]{ - histValues: newHistValues[N](cfg.Boundaries), + histValues: newHistValues[N](cfg.Boundaries, noSum), noMinMax: cfg.NoMinMax, start: now(), } @@ -219,7 +227,9 @@ func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation { Count: b.count, Bounds: bounds, BucketCounts: counts, - Sum: b.sum, + } + if !s.noSum { + hdp.Sum = b.total } if !s.noMinMax { hdp.Min = metricdata.NewExtrema(b.min) diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 3656be9e988..8c75562198d 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -49,10 +49,25 @@ func testHistogram[N int64 | float64](t *testing.T) { } incr := monoIncr[N]() - eFunc := deltaHistExpecter[N](incr) - t.Run("Delta", tester.Run(newDeltaHistogram[N](histConf), incr, eFunc)) + eFunc := deltaHistSummedExpecter[N](incr) + t.Run("Delta/Summed", tester.Run(newDeltaHistogram[N](histConf, false), incr, eFunc)) + eFunc = deltaHistExpecter[N](incr) + t.Run("Delta/NoSum", tester.Run(newDeltaHistogram[N](histConf, true), incr, eFunc)) + eFunc = cumuHistSummedExpecter[N](incr) + t.Run("Cumulative/Summed", tester.Run(newCumulativeHistogram[N](histConf, false), incr, eFunc)) eFunc = cumuHistExpecter[N](incr) - t.Run("Cumulative", tester.Run(newCumulativeHistogram[N](histConf), incr, eFunc)) + t.Run("Cumulative/NoSum", tester.Run(newCumulativeHistogram[N](histConf, true), incr, eFunc)) +} + +func deltaHistSummedExpecter[N int64 | float64](incr setMap[N]) expectFunc { + h := metricdata.Histogram[N]{Temporality: metricdata.DeltaTemporality} + return func(m int) metricdata.Aggregation { + h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) + for a, v := range incr { + h.DataPoints = append(h.DataPoints, hPointSummed[N](a, v, uint64(m))) + } + return h + } } func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { @@ -66,6 +81,19 @@ func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { } } +func cumuHistSummedExpecter[N int64 | float64](incr setMap[N]) expectFunc { + var cycle int + h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality} + return func(m int) metricdata.Aggregation { + cycle++ + h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) + for a, v := range incr { + h.DataPoints = append(h.DataPoints, hPointSummed[N](a, v, uint64(cycle*m))) + } + return h + } +} + func cumuHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { var cycle int h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality} @@ -79,6 +107,25 @@ func cumuHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { } } +// hPointSummed returns an HistogramDataPoint that started and ended now with +// multi number of measurements values v. It includes a min and max (set to v). +func hPointSummed[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.HistogramDataPoint[N] { + idx := sort.SearchFloat64s(bounds, float64(v)) + counts := make([]uint64, len(bounds)+1) + counts[idx] += multi + return metricdata.HistogramDataPoint[N]{ + Attributes: a, + StartTime: now(), + Time: now(), + Count: multi, + Bounds: bounds, + BucketCounts: counts, + Min: metricdata.NewExtrema(v), + Max: metricdata.NewExtrema(v), + Sum: v * N(multi), + } +} + // hPoint returns an HistogramDataPoint that started and ended now with multi // number of measurements values v. It includes a min and max (set to v). func hPoint[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.HistogramDataPoint[N] { @@ -94,7 +141,6 @@ func hPoint[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.Hi BucketCounts: counts, Min: metricdata.NewExtrema(v), Max: metricdata.NewExtrema(v), - Sum: v * N(multi), } } @@ -106,28 +152,50 @@ func TestBucketsBin(t *testing.T) { func testBucketsBin[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { b := newBuckets[N](3) - assertB := func(counts []uint64, count uint64, sum, min, max N) { + assertB := func(counts []uint64, count uint64, min, max N) { + t.Helper() assert.Equal(t, counts, b.counts) assert.Equal(t, count, b.count) - assert.Equal(t, sum, b.sum) assert.Equal(t, min, b.min) assert.Equal(t, max, b.max) } - assertB([]uint64{0, 0, 0}, 0, 0, 0, 0) + assertB([]uint64{0, 0, 0}, 0, 0, 0) b.bin(1, 2) - assertB([]uint64{0, 1, 0}, 1, 2, 0, 2) + assertB([]uint64{0, 1, 0}, 1, 0, 2) b.bin(0, -1) - assertB([]uint64{1, 1, 0}, 2, 1, -1, 2) + assertB([]uint64{1, 1, 0}, 2, -1, 2) + } +} + +func TestBucketsSum(t *testing.T) { + t.Run("Int64", testBucketsSum[int64]()) + t.Run("Float64", testBucketsSum[float64]()) +} + +func testBucketsSum[N int64 | float64]() func(t *testing.T) { + return func(t *testing.T) { + b := newBuckets[N](3) + + var want N + assert.Equal(t, want, b.total) + + b.sum(2) + want = 2 + assert.Equal(t, want, b.total) + + b.sum(-1) + want = 1 + assert.Equal(t, want, b.total) } } -func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram) aggregator[N], getBounds func(aggregator[N]) []float64) func(t *testing.T) { +func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram, bool) aggregator[N], getBounds func(aggregator[N]) []float64) func(t *testing.T) { b := []float64{0, 1, 2} cpB := make([]float64, len(b)) copy(cpB, b) - a := newA(aggregation.ExplicitBucketHistogram{Boundaries: b}) + a := newA(aggregation.ExplicitBucketHistogram{Boundaries: b}, false) return func(t *testing.T) { require.Equal(t, cpB, getBounds(a)) @@ -160,7 +228,7 @@ func TestHistogramImmutableBounds(t *testing.T) { } func TestCumulativeHistogramImutableCounts(t *testing.T) { - a := newCumulativeHistogram[int64](histConf) + a := newCumulativeHistogram[int64](histConf, false) a.Aggregate(5, alice) hdp := a.Aggregation().(metricdata.Histogram[int64]).DataPoints[0] @@ -176,12 +244,12 @@ func TestCumulativeHistogramImutableCounts(t *testing.T) { func TestDeltaHistogramReset(t *testing.T) { t.Cleanup(mockTime(now)) - a := newDeltaHistogram[int64](histConf) + a := newDeltaHistogram[int64](histConf, false) assert.Nil(t, a.Aggregation()) a.Aggregate(1, alice) expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality} - expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPoint[int64](alice, 1, 1)} + expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1)} metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) // The attr set should be forgotten once Aggregations is called. @@ -190,15 +258,15 @@ func TestDeltaHistogramReset(t *testing.T) { // Aggregating another set should not affect the original (alice). a.Aggregate(1, bob) - expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPoint[int64](bob, 1, 1)} + expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1)} metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) } func TestEmptyHistogramNilAggregation(t *testing.T) { - assert.Nil(t, newCumulativeHistogram[int64](histConf).Aggregation()) - assert.Nil(t, newCumulativeHistogram[float64](histConf).Aggregation()) - assert.Nil(t, newDeltaHistogram[int64](histConf).Aggregation()) - assert.Nil(t, newDeltaHistogram[float64](histConf).Aggregation()) + assert.Nil(t, newCumulativeHistogram[int64](histConf, false).Aggregation()) + assert.Nil(t, newCumulativeHistogram[float64](histConf, false).Aggregation()) + assert.Nil(t, newDeltaHistogram[int64](histConf, false).Aggregation()) + assert.Nil(t, newDeltaHistogram[float64](histConf, false).Aggregation()) } func BenchmarkHistogram(b *testing.B) { @@ -207,8 +275,8 @@ func BenchmarkHistogram(b *testing.B) { } func benchmarkHistogram[N int64 | float64](b *testing.B) { - factory := func() aggregator[N] { return newDeltaHistogram[N](histConf) } + factory := func() aggregator[N] { return newDeltaHistogram[N](histConf, false) } b.Run("Delta", benchmarkAggregator(factory)) - factory = func() aggregator[N] { return newCumulativeHistogram[N](histConf) } + factory = func() aggregator[N] { return newCumulativeHistogram[N](histConf, false) } b.Run("Cumulative", benchmarkAggregator(factory)) } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 05fe29ce4f9..5989e0c9575 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -412,7 +412,15 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr meas, comp = b.Sum(false) } case aggregation.ExplicitBucketHistogram: - meas, comp = b.ExplicitBucketHistogram(a) + var noSum bool + switch kind { + case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge: + // The sum should not be collected for any instrument that can make + // negative measurements: + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations + noSum = true + } + meas, comp = b.ExplicitBucketHistogram(a, noSum) default: err = errUnknownAggregation } @@ -426,22 +434,27 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr // | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram | // |--------------------------|------|-----------|-----|-----------|-----------------------| // | Counter | ✓ | | ✓ | ✓ | ✓ | -// | UpDownCounter | ✓ | | ✓ | | | +// | UpDownCounter | ✓ | | ✓ | ✓ | | // | Histogram | ✓ | | ✓ | ✓ | ✓ | -// | Observable Counter | ✓ | | ✓ | | | -// | Observable UpDownCounter | ✓ | | ✓ | | | -// | Observable Gauge | ✓ | ✓ | | | |. +// | Observable Counter | ✓ | | ✓ | ✓ | | +// | Observable UpDownCounter | ✓ | | ✓ | ✓ | | +// | Observable Gauge | ✓ | ✓ | | ✓ | |. func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) error { switch agg.(type) { case aggregation.Default: return nil case aggregation.ExplicitBucketHistogram: - if kind == InstrumentKindCounter || kind == InstrumentKindHistogram { + switch kind { + case InstrumentKindCounter, + InstrumentKindUpDownCounter, + InstrumentKindHistogram, + InstrumentKindObservableCounter, + InstrumentKindObservableUpDownCounter, + InstrumentKindObservableGauge: return nil + default: + return errIncompatibleAggregation } - // TODO: review need for aggregation check after - // https://github.com/open-telemetry/opentelemetry-specification/issues/2710 - return errIncompatibleAggregation case aggregation.Sum: switch kind { case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter, InstrumentKindCounter, InstrumentKindHistogram, InstrumentKindUpDownCounter: diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 3e1c86dc654..c10bbc36803 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -498,7 +498,7 @@ func TestPipelineRegistryResource(t *testing.T) { } func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { - testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) + testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Sum{} })) readers := []Reader{testRdrHistogram} views := []View{defaultView} @@ -643,7 +643,6 @@ func TestIsAggregatorCompatible(t *testing.T) { name: "SyncUpDownCounter and ExplicitBucketHistogram", kind: InstrumentKindUpDownCounter, agg: aggregation.ExplicitBucketHistogram{}, - want: errIncompatibleAggregation, }, { name: "SyncHistogram and Drop", @@ -686,7 +685,6 @@ func TestIsAggregatorCompatible(t *testing.T) { name: "ObservableCounter and ExplicitBucketHistogram", kind: InstrumentKindObservableCounter, agg: aggregation.ExplicitBucketHistogram{}, - want: errIncompatibleAggregation, }, { name: "ObservableUpDownCounter and Drop", @@ -708,7 +706,6 @@ func TestIsAggregatorCompatible(t *testing.T) { name: "ObservableUpDownCounter and ExplicitBucketHistogram", kind: InstrumentKindObservableUpDownCounter, agg: aggregation.ExplicitBucketHistogram{}, - want: errIncompatibleAggregation, }, { name: "ObservableGauge and Drop", @@ -730,7 +727,6 @@ func TestIsAggregatorCompatible(t *testing.T) { name: "ObservableGauge and ExplicitBucketHistogram", kind: InstrumentKindObservableGauge, agg: aggregation.ExplicitBucketHistogram{}, - want: errIncompatibleAggregation, }, { name: "unknown kind with Sum should error",