Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/deltatocumulative]: expire stale series #31337

Merged
merged 23 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
*: forward to main
  • Loading branch information
sh0rez committed Feb 28, 2024
commit c7a21d0c04fb1b5906825e7509cfa639c4adbedb
19 changes: 8 additions & 11 deletions processor/deltatocumulativeprocessor/internal/delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,19 @@ type Accumulator[D data.Point[D]] struct {
// Aggregate implements delta-to-cumulative aggregation as per spec:
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#sums-delta-to-cumulative
func (a *Accumulator[D]) Aggregate(ctx context.Context, id streams.Ident, dp D) (D, error) {
// make the accumulator start with the current sample, discarding any
// earlier data. return after use
reset := func() (D, error) {
aggr, ok := a.dps.Load(id)

// new series: initialize with current sample
if !ok {
clone := dp.Clone()
a.dps.Store(id, clone)
return clone, nil
}

aggr, ok := a.dps.Load(id)

// new series: reset
if !ok {
return reset()
}
// belongs to older series: drop
if dp.StartTimestamp() < aggr.StartTimestamp() {
// drop bad samples
switch {
case dp.StartTimestamp() < aggr.StartTimestamp():
// belongs to older series
return aggr, ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()}
case dp.Timestamp() <= aggr.Timestamp():
// out of order
Expand Down
33 changes: 23 additions & 10 deletions processor/deltatocumulativeprocessor/internal/delta/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,32 +115,45 @@ func TestTimes(t *testing.T) {
defer cancel()

acc := delta.Numbers(ctx, delta.Options{MaxStale: 0})
id, data := random.Sum().Stream()
id, base := random.Sum().Stream()
point := func(start, last pcommon.Timestamp) data.Number {
dp := base.Clone()
dp.SetStartTimestamp(start)
dp.SetTimestamp(last)
return dp
}

// first sample: its the first ever, so take it as-is
{
dp := point(1000, 1000)
res, err := acc.Aggregate(id, dp)
res, err := acc.Aggregate(ctx, id, dp)

require.NoError(t, err)
require.Equal(t, time(1000), res.StartTimestamp())
require.Equal(t, time(1000), res.Timestamp())
}

r1, err := acc.Aggregate(ctx, id, first)
require.NoError(t, err)
require.Equal(t, start, r1.StartTimestamp())
require.Equal(t, ts1, r1.Timestamp())
// second sample: its subsequent, so keep original startTime, but update lastSeen
{
dp := point(1000, 1100)
res, err := acc.Aggregate(ctx, id, dp)

require.NoError(t, err)
require.Equal(t, time(1000), res.StartTimestamp())
require.Equal(t, time(1100), res.Timestamp())
}

r2, err := acc.Aggregate(ctx, id, second)
require.NoError(t, err)
require.Equal(t, start, r2.StartTimestamp())
require.Equal(t, ts2, r2.Timestamp())
// third sample: its subsequent, but has a more recent startTime, which is
// PERMITTED by the spec.
// still keep original startTime, but update lastSeen.
{
dp := point(1100, 1200)
res, err := acc.Aggregate(ctx, id, dp)

require.NoError(t, err)
require.Equal(t, time(1000), res.StartTimestamp())
require.Equal(t, time(1200), res.Timestamp())
}
}

func TestErrs(t *testing.T) {
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.