Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusexporter] accumulate delta temporality histograms #23790

Merged
merged 35 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
574b131
[exporter/prometheusexporter] accumulate delta temporality sums and h…
nabam Aug 18, 2022
2385f79
Merge branch 'main' into delta-prom
nabam Aug 18, 2022
ac88fb9
Update exporter/prometheusexporter/accumulator.go
nabam Oct 18, 2022
b96d0f5
Update exporter/prometheusexporter/accumulator.go
nabam Oct 18, 2022
f21cc65
Merge branch 'main' into chore/rebase-nabam-delta-prom
locmai Mar 30, 2023
3c69be1
fix tests
locmai Mar 30, 2023
ba8d644
add back aggregation change
locmai Mar 30, 2023
d9d5b95
set empty histogram
locmai Mar 30, 2023
4034315
demo
locmai Mar 30, 2023
3631465
Revert "demo"
locmai Mar 30, 2023
aa6d049
Revert "Revert "demo""
locmai Mar 30, 2023
11bb9a6
update aggTemporality
locmai Mar 30, 2023
6f805d3
update changelog
locmai Mar 31, 2023
7b5aa64
fix the tests by setting bounds and bucket counts
locmai Mar 31, 2023
1979cbf
fix per aneurysm9's comment
locmai Apr 4, 2023
e42fcea
handle delta-cumulative histogram interval misalignments
hkfgo Jun 25, 2023
504e3b5
handle delta-cumulative histogram interval misalignments
hkfgo Jun 26, 2023
98c0090
add test for misalignments
hkfgo Jun 26, 2023
b2e18ca
add code change and test to address out-of-bound error when comparing…
hkfgo Jun 26, 2023
c7d17bb
Merge pull request #1 from hkfgo/chore/rebase-nabam-delta-prom
locmai Jun 27, 2023
3344f60
handle delta-cumulative histogram interval misalignments
hkfgo Jun 27, 2023
a52f5af
handle delta-cumulative histogram interval misalignments
hkfgo Jun 27, 2023
107b53b
add test for misalignments
hkfgo Jun 27, 2023
ca4dde2
add code change and test to address out-of-bound error when comparing…
hkfgo Jun 27, 2023
fa39587
Merge branch 'main' into chore/rebase-nabam-delta-prom
hkfgo Jun 30, 2023
5705810
handle hasSum mismatch and do independent test setups
hkfgo Jul 6, 2023
eaa04e2
remove sum check
hkfgo Jul 11, 2023
231902d
Merge branch 'main' into chore/rebase-nabam-delta-prom
hkfgo Jul 17, 2023
42a7098
Merge branch 'main' into chore/rebase-nabam-delta-prom
hkfgo Aug 28, 2023
b6e66e3
merge master
hkfgo Aug 28, 2023
7d8b888
handle timestamp bug found during end-to-end testing
hkfgo Aug 29, 2023
e6f0cef
Merge branch 'main' into chore/rebase-nabam-delta-prom
Nov 1, 2023
5d6f49d
Merge branch 'main' into chore/rebase-nabam-delta-prom
hkfgo Dec 6, 2023
2c906ef
tidy up accumulator comments and logs
hkfgo Dec 6, 2023
a52102d
tidy up accumulator comments and logs
hkfgo Dec 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add test for misalignments
Signed-off-by: stephenchen <x.chen1016@gmail.com>
  • Loading branch information
hkfgo committed Jun 27, 2023
commit 107b53b9c9c4fb4b69bb0038593f549b3039307b
23 changes: 10 additions & 13 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,16 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco

switch histogram.AggregationTemporality() {
case pmetric.AggregationTemporalityDelta:
if ip.StartTimestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
// only keep datapoint with latest timestamp
a.logger.Warn(zap.String("Dropping misaligned histogram datapoint for time series ", signature))
continue
}
// assuming an application restart and reset counter
if ip.StartTimestamp().AsTime().After(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
if ip.StartTimestamp().AsTime() != mv.value.Histogram().DataPoints().At(0).StartTimestamp().AsTime() {
// treat misalgnment as restart and reset or violation of single-writer principle and drop
if ip.StartTimestamp().AsTime().After(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
} else {
a.logger.With(
zap.String("time_series", signature),
).Warn("Dropped misaligned histogram datapoint")
continue
}
} else {
accumulateHistogramValues(mv.value.Histogram().DataPoints().At(0), ip, m.Histogram().DataPoints().AppendEmpty())
}
Expand Down Expand Up @@ -347,11 +349,6 @@ func copyMetricMetadata(metric pmetric.Metric) pmetric.Metric {
}

func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) {
//if current.StartTimestamp().AsTime().Before(prev.StartTimestamp().AsTime()) {
// dest.SetStartTimestamp(current.StartTimestamp())
//} else {
// dest.SetStartTimestamp(prev.StartTimestamp())
//}
dest.SetStartTimestamp(prev.StartTimestamp())

older := prev
Expand Down
69 changes: 64 additions & 5 deletions exporter/prometheusexporter/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestAccumulateHistogram(t *testing.T) {
appendHistogram := func(ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) {
appendHistogram := func(startTs time.Time, ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
Expand All @@ -40,20 +40,25 @@ func TestAccumulateHistogram(t *testing.T) {
dp.Attributes().PutStr("label_1", "1")
dp.Attributes().PutStr("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTs))
}

startTs1 := time.Now().Add(-6 * time.Second)
startTs2 := time.Now().Add(-5 * time.Second)
startTs3 := time.Now()
ts1 := time.Now().Add(-4 * time.Second)
ts2 := time.Now().Add(-3 * time.Second)
ts3 := time.Now().Add(-2 * time.Second)
ts4 := time.Now().Add(-1 * time.Second)
ts5 := time.Now().Add(1 * time.Second)

a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)

resourceMetrics1 := pmetric.NewResourceMetrics()
ilm1 := resourceMetrics1.ScopeMetrics().AppendEmpty()
ilm1.Scope().SetName("test")
appendHistogram(ts3, 5, 2.5, []uint64{1, 3, 1, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
appendHistogram(ts2, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
appendHistogram(startTs2, ts3, 5, 2.5, []uint64{1, 3, 1, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
appendHistogram(startTs2, ts2, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())

m3 := ilm1.Metrics().At(0).Histogram().DataPoints().At(0)
m2 := ilm1.Metrics().At(1).Histogram().DataPoints().At(0)
Expand All @@ -63,16 +68,30 @@ func TestAccumulateHistogram(t *testing.T) {
resourceMetrics2 := pmetric.NewResourceMetrics()
ilm2 := resourceMetrics2.ScopeMetrics().AppendEmpty()
ilm2.Scope().SetName("test")
appendHistogram(ts1, 7, 5, []uint64{3, 1, 1, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics())
appendHistogram(startTs2, ts1, 7, 5, []uint64{3, 1, 1, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics())

// add extra buckets
resourceMetrics3 := pmetric.NewResourceMetrics()
ilm3 := resourceMetrics3.ScopeMetrics().AppendEmpty()
ilm3.Scope().SetName("test")
appendHistogram(ts4, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics())
appendHistogram(startTs2, ts4, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics())

m4 := ilm3.Metrics().At(0).Histogram().DataPoints().At(0)

// misaligned start timestamp, drop
resourceMetrics4 := pmetric.NewResourceMetrics()
ilm4 := resourceMetrics4.ScopeMetrics().AppendEmpty()
ilm4.Scope().SetName("test")
appendHistogram(startTs1, ts5, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm4.Metrics())
appendHistogram(ts3, ts5, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm4.Metrics())

// misaligned start timestamp, treat as restart
resourceMetrics5 := pmetric.NewResourceMetrics()
ilm5 := resourceMetrics5.ScopeMetrics().AppendEmpty()
ilm5.Scope().SetName("test")
appendHistogram(startTs3, ts5, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm5.Metrics())
m5 := ilm5.Metrics().At(0).Histogram().DataPoints().At(0)

t.Run("Accumulate", func(t *testing.T) {
Aneurysm9 marked this conversation as resolved.
Show resolved Hide resolved
n := a.Accumulate(resourceMetrics1)
require.Equal(t, 2, n)
Expand Down Expand Up @@ -133,6 +152,46 @@ func TestAccumulateHistogram(t *testing.T) {
require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("MisalignedTimestamps/Drop", func(t *testing.T) {
// should reset when different buckets arrive
n := a.Accumulate(resourceMetrics4)
require.Equal(t, 0, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m4.Sum(), v.Sum())
require.Equal(t, m4.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m4.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("MisalignedTimestamps/Reset", func(t *testing.T) {
// reset when start timestamp skips ahead
n := a.Accumulate(resourceMetrics5)
require.Equal(t, 1, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m5.Sum(), v.Sum())
require.Equal(t, m5.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m5.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m5.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
}

func TestAccumulateMetrics(t *testing.T) {
Expand Down