Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusexporter] accumulate delta temporality histograms #20530

Closed
11 changes: 11 additions & 0 deletions .chloggen/promexp-delta.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Accumulate histograms with delta temporality

# One or more tracking issues related to the change
issues: [4968]
98 changes: 79 additions & 19 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (a *lastValueAccumulator) addMetric(metric pmetric.Metric, il pcommon.Instr
case pmetric.MetricTypeSum:
return a.accumulateSum(metric, il, resourceAttrs, now)
case pmetric.MetricTypeHistogram:
return a.accumulateDoubleHistogram(metric, il, resourceAttrs, now)
return a.accumulateHistogram(metric, il, resourceAttrs, now)
case pmetric.MetricTypeSummary:
return a.accumulateSummary(metric, il, resourceAttrs, now)
default:
Expand Down Expand Up @@ -172,19 +172,19 @@ func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon
}

func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
doubleSum := metric.Sum()
sum := metric.Sum()

// Drop metrics with unspecified aggregations
if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityUnspecified {
if sum.AggregationTemporality() == pmetric.AggregationTemporalityUnspecified {
return
}

// Drop non-monotonic and non-cumulative metrics
if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && !doubleSum.IsMonotonic() {
if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && !sum.IsMonotonic() {
return
}

dps := doubleSum.DataPoints()
dps := sum.DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

Expand Down Expand Up @@ -212,7 +212,7 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
}

// Delta-to-Cumulative
if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && ip.StartTimestamp() == mv.value.Sum().DataPoints().At(0).Timestamp() {
if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && ip.StartTimestamp() == mv.value.Sum().DataPoints().At(0).Timestamp() {
ip.SetStartTimestamp(mv.value.Sum().DataPoints().At(0).StartTimestamp())
switch ip.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
Expand All @@ -229,18 +229,15 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
}

return
}

func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
doubleHistogram := metric.Histogram()
func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
histogram := metric.Histogram()

// Drop metrics with non-cumulative aggregations
if doubleHistogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
return
}
dps := histogram.DataPoints()

dps := doubleHistogram.DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

Expand All @@ -254,20 +251,30 @@ func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pmetric.Metric,
if !ok {
m := copyMetricMetadata(metric)
ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
continue
}
mv := v.(*accumulatedValue)

if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
// only keep datapoint with latest timestamp
m := copyMetricMetadata(metric)
m.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

switch histogram.AggregationTemporality() {
case pmetric.AggregationTemporalityDelta:
accumulateHistogramValues(mv.value.Histogram().DataPoints().At(0), ip, m.Histogram().DataPoints().AppendEmpty())
case pmetric.AggregationTemporalityCumulative:
if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
// only keep datapoint with latest timestamp
continue
}

ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
default:
// unsupported temporality
continue
}

m := copyMetricMetadata(metric)
ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
}
Expand Down Expand Up @@ -327,3 +334,56 @@ func copyMetricMetadata(metric pmetric.Metric) pmetric.Metric {

return m
}

func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) {
if current.StartTimestamp().AsTime().Before(prev.StartTimestamp().AsTime()) {
dest.SetStartTimestamp(current.StartTimestamp())
} else {
dest.SetStartTimestamp(prev.StartTimestamp())
}

older := prev
newer := current
if current.Timestamp().AsTime().Before(prev.Timestamp().AsTime()) {
older = current
newer = prev
}

newer.Attributes().CopyTo(dest.Attributes())
dest.SetTimestamp(newer.Timestamp())
Comment on lines +339 to +353
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the current.Timestamp() is older but current.StartTimestamp() is newer? Or the inverse? I'm not sure it's correct to allow these values to be taken from different data points.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean something like this:

|---prev.StartTimestamp() ------------------------------------ prev.Timestamp(). ---|
           |---current.StartTimestamp() ---- current.Timestamp(). ---|

TBH I'm not sure. We had a similar case for Sums and would consider it a single-write principle violation ( https://opentelemetry.io/docs/reference/specification/metrics/data-model/#sums-detecting-alignment-issues)

With the current implementation, we would take the newer one based on the Timestamp() comparison. Though here are the options we could consider:

If the incoming delta time interval has significant overlap with the previous time interval, we assume a violation of the single-writer principle and can be handled with one of the following options:

  • Simply report the inconsistencies in time intervals, as the error condition could be caused by a misconfiguration.
  • Eliminate the overlap / deduplicate on the receiver side.
  • Correct the inconsistent time intervals by differentiating the given Resource and Attribute set used from overlapping time.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need any help here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bitomaxsp, thanks for joining.

The above concern from @Aneurysm9 is what we had left (so far). I don' think I addressed it well. We are discussing about what to do with the case where:

current.Timestamp() is older but current.StartTimestamp() is newer? Or the inverse?

as I understand, this could happened due to the multi-writers or bugs from the clients.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean something like this:

|---prev.StartTimestamp() ------------------------------------ prev.Timestamp(). ---|
           |---current.StartTimestamp() ---- current.Timestamp(). ---|

In this case I think the current implementation would result in dest taking StartTimestamp from prev and Timestamp from current without any indication that it had combined them.

TBH I'm not sure. We had a similar case for Sums and would consider it a single-write principle violation ( https://opentelemetry.io/docs/reference/specification/metrics/data-model/#sums-detecting-alignment-issues)

With the current implementation, we would take the newer one based on the Timestamp() comparison. Though here are the options we could consider:

If the incoming delta time interval has significant overlap with the previous time interval, we assume a violation of the single-writer principle and can be handled with one of the following options:

  • Simply report the inconsistencies in time intervals, as the error condition could be caused by a misconfiguration.
  • Eliminate the overlap / deduplicate on the receiver side.
  • Correct the inconsistent time intervals by differentiating the given Resource and Attribute set used from overlapping time.

I'm not really sure what the right thing to do here is, but I would suggest at least logging a warning if current.StartTimestamp().Before(prev.Timestamp()). I think this situation would indicate an inconsistency in the production of the metrics and shouldn't be silently accepted.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging a warning seems reasonable.
But also should not we drop points that doesn't fit in the time interval (in addition to printing a warning) ?

I have a chance to test it a bit. I put to production and it crashes.

panic: runtime error: index out of range [35] with length 35
...

Thanks for testing this out! It must be that this one is missing out the test cases for some parts. I will take a look at your logs and see what I can do.

Also going to add the warning log as Aneurysm9 commented above

Let me know, i can test it again when you fix it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comments below why it's crashing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay my memory come back a little bit here with the 3 cases:

Case 1: Delta-to-cumulative

|prev.StartTimestamp() - prev.Timestamp()|
                                         |current.StartTimestamp() - current.Timestamp()|
|dest.StartTimestamp() ------------------------------------------------ dest.Timestamp()|

We take the accumulated sum of them only if the current.StartTimestamp() == prev.StartTimestamp()

Case 2: Reset

|prev.StartTimestamp() - prev.Timestamp()|
                                                   |current.StartTimestamp() - current.Timestamp()|
                                                   |dest.StartTimestamp() ------- dest.Timestamp()|

Since the current.StartTimestamp() is after the prev.Timestamp() - we understand it as the delta has been reset.

Case 3: Anything else - for example:

|prev.StartTimestamp() --------------------- prev.Timestamp()|
     |current.StartTimestamp() - current.Timestamp()|
|dest.StartTimestamp() --------------------- dest.Timestamp()|

We only keep the latest Timestamp. Which means whatever ended with latest Timestamp() will be kept. Which for this case we would log the warning message out.

Reference the Sum approach: https://opentelemetry.io/docs/reference/specification/metrics/data-model/#sums-delta-to-cumulative

Upon receiving future Delta points, we do the following:
- If the next point aligns with the expected next-time window (see [detecting delta restarts](https://opentelemetry.io/docs/reference/specification/metrics/data-model/#sums-detecting-alignment-issues))
  - Update the “last seen” time to align with the time of the current point.
  - Add the current value to the cumulative counter
  - Output a new cumulative point with the original start time and current last seen time and count.
- If the current point precedes the start time, then drop this point. Note: there are algorithms which can deal with late arriving points.
- If the next point does NOT align with the expected next-time window, then reset the counter following the same steps performed as if the current point was the first point seen.

@bitomaxsp

I went through original Lev's (#9006) PR. I see that this PR doesn't include Sum accumulation. Did you forget it or removed intentionally? I can help there if needed

I did the delta-to-cumulative for the Sum at the same time Lev made the #9006 as I was in need of using it for the Sum/Counter. So for this PR I left that part out as it has been done.


match := true
if older.ExplicitBounds().Len() == newer.ExplicitBounds().Len() {
for i := 0; i < newer.BucketCounts().Len(); i++ {
if older.ExplicitBounds().At(i) != newer.ExplicitBounds().At(i) {
match = false
break
}
}
} else {
match = false
}

if match {
dest.SetCount(newer.Count() + older.Count())
if newer.HasSum() && older.HasSum() {
dest.SetSum(newer.Sum() + older.Sum())
}

counts := make([]uint64, newer.BucketCounts().Len())
for i := 0; i < newer.BucketCounts().Len(); i++ {
counts[i] = newer.BucketCounts().At(i) + older.BucketCounts().At(i)
}
dest.BucketCounts().FromRaw(counts)
} else {
// use new value if bucket bounds do not match
dest.SetCount(newer.Count())
if newer.HasSum() {
dest.SetSum(newer.Sum())
}

dest.BucketCounts().FromRaw(newer.BucketCounts().AsRaw())
}

dest.ExplicitBounds().FromRaw(newer.ExplicitBounds().AsRaw())
}
139 changes: 103 additions & 36 deletions exporter/prometheusexporter/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,46 +26,113 @@ import (
"go.uber.org/zap"
)

func TestAccumulateDeltaAggregation(t *testing.T) {
tests := []struct {
name string
fillMetric func(time.Time, pmetric.Metric)
}{
{
name: "Histogram",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
metric.SetDescription("test description")
dp := metric.Histogram().DataPoints().AppendEmpty()
dp.BucketCounts().FromRaw([]uint64{5, 2})
dp.SetCount(7)
dp.ExplicitBounds().FromRaw([]float64{3.5, 10.0})
dp.SetSum(42.42)
dp.Attributes().PutStr("label_1", "1")
dp.Attributes().PutStr("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
func TestAccumulateHistogram(t *testing.T) {
appendHistogram := func(ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
metric.SetDescription("test description")
dp := metric.Histogram().DataPoints().AppendEmpty()
dp.ExplicitBounds().FromRaw(bounds)
dp.BucketCounts().FromRaw(counts)
dp.SetCount(count)
dp.SetSum(sum)
dp.Attributes().PutStr("label_1", "1")
dp.Attributes().PutStr("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resourceMetrics := pmetric.NewResourceMetrics()
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
tt.fillMetric(time.Now(), ilm.Metrics().AppendEmpty())
ts1 := time.Now().Add(-4 * time.Second)
ts2 := time.Now().Add(-3 * time.Second)
ts3 := time.Now().Add(-2 * time.Second)
ts4 := time.Now().Add(-1 * time.Second)

a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
n := a.Accumulate(resourceMetrics)
require.Equal(t, 0, n)
a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)

signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), pcommon.NewMap(), pcommon.NewMap())
v, ok := a.registeredMetrics.Load(signature)
require.False(t, ok)
require.Nil(t, v)
})
}
resourceMetrics1 := pmetric.NewResourceMetrics()
ilm1 := resourceMetrics1.ScopeMetrics().AppendEmpty()
ilm1.Scope().SetName("test")
appendHistogram(ts3, 5, 2.5, []uint64{1, 3, 1, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
appendHistogram(ts2, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())

m3 := ilm1.Metrics().At(0).Histogram().DataPoints().At(0)
m2 := ilm1.Metrics().At(1).Histogram().DataPoints().At(0)
signature := timeseriesSignature(ilm1.Scope().Name(), ilm1.Metrics().At(0), m2.Attributes(), pcommon.NewMap())

// different buckets from the past
resourceMetrics2 := pmetric.NewResourceMetrics()
ilm2 := resourceMetrics2.ScopeMetrics().AppendEmpty()
ilm2.Scope().SetName("test")
appendHistogram(ts1, 7, 5, []uint64{3, 1, 1, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics())

// add extra buckets
resourceMetrics3 := pmetric.NewResourceMetrics()
ilm3 := resourceMetrics3.ScopeMetrics().AppendEmpty()
ilm3.Scope().SetName("test")
appendHistogram(ts4, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics())

m4 := ilm3.Metrics().At(0).Histogram().DataPoints().At(0)

t.Run("Accumulate", func(t *testing.T) {
n := a.Accumulate(resourceMetrics1)
require.Equal(t, 2, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m3.Sum()+m2.Sum(), v.Sum())
require.Equal(t, m3.Count()+m2.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m3.BucketCounts().At(i)+m2.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m3.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("ResetBuckets/Ignore", func(t *testing.T) {
// should ignore metric from the past
n := a.Accumulate(resourceMetrics2)

require.Equal(t, 1, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m3.Sum()+m2.Sum(), v.Sum())
require.Equal(t, m3.Count()+m2.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m3.BucketCounts().At(i)+m2.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m3.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("ResetBuckets/Perform", func(t *testing.T) {
// should reset when different buckets arrive
n := a.Accumulate(resourceMetrics3)
require.Equal(t, 1, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m4.Sum(), v.Sum())
require.Equal(t, m4.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m4.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
}

func TestAccumulateMetrics(t *testing.T) {
Expand Down