Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/deltatocumulative]: drop samples of streams exceeding limit #33286

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .chloggen/deltatocumulative-drop-at-limit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatocumulative

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: properly drop samples when at limit

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33285]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
fixes a segfault in the limiting behavior, where streams exceeding the limit still had their samples processed.
due to not being tracked, this led to a nil-pointer deref


# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
20 changes: 20 additions & 0 deletions processor/deltatocumulativeprocessor/internal/metrics/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics"

import (
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
)

Expand All @@ -28,6 +30,12 @@ func (s Sum) Ident() Ident {
return (*Metric)(&s).Ident()
}

func (s Sum) Filter(expr func(data.Number) bool) {
s.Sum().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool {
return !expr(data.Number{NumberDataPoint: dp})
})
}

type Histogram Metric

func (s Histogram) At(i int) data.Histogram {
Expand All @@ -43,6 +51,12 @@ func (s Histogram) Ident() Ident {
return (*Metric)(&s).Ident()
}

func (s Histogram) Filter(expr func(data.Histogram) bool) {
s.Histogram().DataPoints().RemoveIf(func(dp pmetric.HistogramDataPoint) bool {
return !expr(data.Histogram{HistogramDataPoint: dp})
})
}

type ExpHistogram Metric

func (s ExpHistogram) At(i int) data.ExpHistogram {
Expand All @@ -57,3 +71,9 @@ func (s ExpHistogram) Len() int {
func (s ExpHistogram) Ident() Ident {
return (*Metric)(&s).Ident()
}

func (s ExpHistogram) Filter(expr func(data.ExpHistogram) bool) {
s.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool {
return !expr(data.ExpHistogram{DataPoint: dp})
})
}
28 changes: 21 additions & 7 deletions processor/deltatocumulativeprocessor/internal/streams/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,34 @@ func Samples[D data.Point[D]](m metrics.Data[D]) Seq[D] {
}
}

// Aggregate each point and replace it by the result
func Aggregate[D data.Point[D]](m metrics.Data[D], aggr Aggregator[D]) error {
type filterable[D data.Point[D]] interface {
metrics.Data[D]
Filter(func(D) bool)
}

// Apply does dps[i] = fn(dps[i]) for each item in dps.
// If fn returns [streams.Drop], the datapoint is removed from dps instead.
// If fn returns another error, the datapoint is also removed and the error returned eventually
func Apply[P data.Point[P], List filterable[P]](dps List, fn func(Ident, P) (P, error)) error {
var errs error

// for id, dp := range Samples(m)
Samples(m)(func(id Ident, dp D) bool {
next, err := aggr.Aggregate(id, dp)
mid := dps.Ident()
dps.Filter(func(dp P) bool {
id := identity.OfStream(mid, dp)
next, err := fn(id, dp)
if err != nil {
errs = errors.Join(errs, Error(id, err))
return true
if !errors.Is(err, Drop) {
errs = errors.Join(errs, err)
}
return false
}

next.CopyTo(dp)
return true
})

return errs
}

// Drop signals the current item (stream or datapoint) is to be dropped
var Drop = errors.New("stream dropped") //nolint:revive // Drop is a good name for a signal, see fs.SkipAll
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps ErrDrop? If i recall correctly its more idiomatic to prefix error constants with Err but I'm not sure if thats enforced here.

Copy link
Member Author

@sh0rez sh0rez Jun 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not technically an error, more a signal to another algorithm. A standard-library equivalent would be fs.SkipAll, which is also not prefixed with Err

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package streams_test

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -23,7 +24,7 @@ func BenchmarkSamples(b *testing.B) {
dps := generate(b.N)
b.ResetTimer()

streams.Samples[data.Number](dps)(func(id streams.Ident, dp data.Number) bool {
streams.Samples(dps)(func(id streams.Ident, dp data.Number) bool {
rdp = dp
rid = id
return true
Expand Down Expand Up @@ -55,22 +56,6 @@ func BenchmarkSamples(b *testing.B) {
})
}

func TestSample(t *testing.T) {
const total = 1000
dps := generate(total)

// check that all samples are visited
seen := 0
streams.Samples[data.Number](dps)(func(id streams.Ident, dp data.Number) bool {
require.Equal(t, dps.id, id)
require.Equal(t, dps.dps[seen], dp)
seen++
return true
})

require.Equal(t, total, seen)
}

func TestAggregate(t *testing.T) {
const total = 1000
dps := generate(total)
Expand All @@ -82,7 +67,7 @@ func TestAggregate(t *testing.T) {
return dp, nil
})

err := streams.Aggregate(dps, inv)
err := streams.Apply(dps, inv.Aggregate)
require.NoError(t, err)

// check that all samples are inverted
Expand All @@ -91,15 +76,34 @@ func TestAggregate(t *testing.T) {
}
}

func generate(n int) Data {
func TestDrop(t *testing.T) {
const total = 1000
dps := generate(total)

var want []data.Number
maybe := aggr(func(_ streams.Ident, dp data.Number) (data.Number, error) {
if rand.Intn(2) == 1 {
want = append(want, dp)
return dp, nil
}
return dp, streams.Drop
})

err := streams.Apply(dps, maybe.Aggregate)
require.NoError(t, err)

require.Equal(t, want, dps.dps)
}

func generate(n int) *Data {
id, ndp := random.Sum().Stream()
dps := Data{id: id, dps: make([]data.Number, n)}
for i := range dps.dps {
dp := ndp.Clone()
dp.SetIntValue(int64(i))
dps.dps[i] = dp
}
return dps
return &dps
}

type Data struct {
Expand All @@ -119,6 +123,16 @@ func (l Data) Ident() metrics.Ident {
return l.id.Metric()
}

func (l *Data) Filter(expr func(data.Number) bool) {
var next []data.Number
for _, dp := range l.dps {
if expr(dp) {
next = append(next, dp)
}
}
l.dps = next
}

type aggr func(streams.Ident, data.Number) (data.Number, error)

func (a aggr) Aggregate(id streams.Ident, dp data.Number) (data.Number, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestFaults(t *testing.T) {
Pre func(Map, identity.Stream, data.Number) error
Bad func(Map, identity.Stream, data.Number) error
Err error
Want error
}

sum := random.Sum()
Expand Down Expand Up @@ -87,7 +88,8 @@ func TestFaults(t *testing.T) {
dp.SetTimestamp(ts(20))
return dps.Store(id, dp)
},
Err: streams.ErrLimit(1),
Err: streams.ErrLimit(1),
Want: streams.Drop, // we can't ignore being at limit, we need to drop the entire stream for this request
},
{
Name: "evict",
Expand Down Expand Up @@ -130,7 +132,7 @@ func TestFaults(t *testing.T) {
require.Equal(t, c.Err, err)

err = c.Bad(onf, id, dp.Clone())
require.NoError(t, err)
require.Equal(t, c.Want, err)
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ func (f Faults[T]) Store(id streams.Ident, v T) error {
inc(f.dps.dropped, reason("out-of-order"))
case errors.As(err, &limit):
inc(f.dps.dropped, reason("stream-limit"))
// no space to store stream, drop it instead of failing silently
return streams.Drop
case errors.As(err, &evict):
inc(f.streams.evicted)
case errors.As(err, &gap):
Expand Down
4 changes: 2 additions & 2 deletions processor/deltatocumulativeprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
case pmetric.MetricTypeSum:
sum := m.Sum()
if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta {
err := streams.Aggregate(metrics.Sum(m), p.sums.aggr)
err := streams.Apply(metrics.Sum(m), p.sums.aggr.Aggregate)
errs = errors.Join(errs, err)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
Expand All @@ -147,7 +147,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
case pmetric.MetricTypeExponentialHistogram:
expo := m.ExponentialHistogram()
if expo.AggregationTemporality() == pmetric.AggregationTemporalityDelta {
err := streams.Aggregate(metrics.ExpHistogram(m), p.expo.aggr)
err := streams.Apply(metrics.ExpHistogram(m), p.expo.aggr.Aggregate)
errs = errors.Join(errs, err)
expo.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
Expand Down