Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/interval] Support Gauges and Summaries #34805

Merged
merged 9 commits into from
Aug 29, 2024
27 changes: 27 additions & 0 deletions .chloggen/intervalprocessor_gauge_summary.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/interval

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support for gauge and summary metrics.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34803]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Only the last value of a gauge or summary metric is reported in the interval processor, instead of all values.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
8 changes: 6 additions & 2 deletions processor/intervalprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@ The interval processor (`intervalprocessor`) aggregates metrics and periodically
* Monotonically increasing, cumulative sums
* Monotonically increasing, cumulative histograms
* Monotonically increasing, cumulative exponential histograms
* Gauges
* Summaries

The following metric types will *not* be aggregated, and will instead be passed, unchanged, to the next component in the pipeline:

* All delta metrics
* Non-monotonically increasing sums
* Gauges
* Summaries

While sending only the last for cumulative metrics doesn't represent data loss, it's arguable if we do the same thing for Gauges and Summaries. In a push-based model, one can tell its instrumentation library to push metrics in important situations and the `intervalprocessor` might discard the data point if it was not the last in a given interval. For this reason, `Gauge` and `Summary` metric types can be configured to be passed through as they are. Copying the same behavior as delta metrics.
ArthurSens marked this conversation as resolved.
Show resolved Hide resolved

## Configuration

The following settings can be optionally configured:

* `interval`: The interval in which the processor should export the aggregated metrics. Default: 60s
* `gauge_pass_through`: Whether gauges should pass through as they are to the next component or be aggregated. Default: false
* `summary_pass_through`: Whether summaries should pass through as they are to the next component or be aggregated. Default: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add one for non-monotonic sums? IMO they're no different than gauges.

Copy link
Member Author

@ArthurSens ArthurSens Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be ok with that, but I remember @sh0rez telling me that non-monotonic sums are a bit trickier. Do you have any thoughts?

But in any case, I'd be happy to do so in a separate PR. I'm already getting lost with my commits 😬

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine by me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

ignore:
  gauges: true
  summaries: true

feels more elegant, but no strong opinion at all

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like @sh0rez 's idea. Though, I would go for pass_through instead of ignore. To be a bit more explicit


## Example of metric flows

Expand Down
8 changes: 7 additions & 1 deletion processor/intervalprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ var _ component.Config = (*Config)(nil)

// Config defines the configuration for the processor.
type Config struct {
// Interval is the time
// Interval is the time interval at which the processor will aggregate metrics.
Interval time.Duration `mapstructure:"interval"`
// GaugePassThrough is a flag that determines whether gauge metrics should be passed through
// as they are or aggregated.
GaugePassThrough bool `mapstructure:"gauge_pass_through"`
// SummaryPassThrough is a flag that determines whether summary metrics should be passed through
// as they are or aggregated.
SummaryPassThrough bool `mapstructure:"summary_pass_through"`
}

// Validate checks whether the input configuration has all of the required fields for the processor.
Expand Down
4 changes: 3 additions & 1 deletion processor/intervalprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ func NewFactory() processor.Factory {

func createDefaultConfig() component.Config {
return &Config{
Interval: 60 * time.Second,
Interval: 60 * time.Second,
GaugePassThrough: false,
SummaryPassThrough: false,
}
}

Expand Down
3 changes: 0 additions & 3 deletions processor/intervalprocessor/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type DataPointSlice[DP DataPoint[DP]] interface {
Expand All @@ -15,8 +14,6 @@ type DataPointSlice[DP DataPoint[DP]] interface {
}

type DataPoint[Self any] interface {
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint

Timestamp() pcommon.Timestamp
Attributes() pcommon.Map
CopyTo(dest Self)
Expand Down
29 changes: 25 additions & 4 deletions processor/intervalprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ type Processor struct {
numberLookup map[identity.Stream]pmetric.NumberDataPoint
histogramLookup map[identity.Stream]pmetric.HistogramDataPoint
expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint
summaryLookup map[identity.Stream]pmetric.SummaryDataPoint

exportInterval time.Duration
exportInterval time.Duration
gaugePassThrough bool
summaryPassThrough bool

nextConsumer consumer.Metrics
}
Expand All @@ -59,8 +62,11 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics
numberLookup: map[identity.Stream]pmetric.NumberDataPoint{},
histogramLookup: map[identity.Stream]pmetric.HistogramDataPoint{},
expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{},
summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{},

exportInterval: config.Interval,
exportInterval: config.Interval,
gaugePassThrough: config.GaugePassThrough,
summaryPassThrough: config.SummaryPassThrough,

nextConsumer: nextConsumer,
}
Expand Down Expand Up @@ -102,8 +108,22 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool {
sm.Metrics().RemoveIf(func(m pmetric.Metric) bool {
switch m.Type() {
case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary:
return false
case pmetric.MetricTypeSummary:
if p.summaryPassThrough {
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.summaryLookup)
return true
case pmetric.MetricTypeGauge:
if p.gaugePassThrough {
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.numberLookup)
return true
case pmetric.MetricTypeSum:
// Check if we care about this value
sum := m.Sum()
Expand Down Expand Up @@ -202,6 +222,7 @@ func (p *Processor) exportMetrics() {
clear(p.numberLookup)
clear(p.histogramLookup)
clear(p.expHistogramLookup)
clear(p.summaryLookup)

return out
}()
Expand Down
32 changes: 18 additions & 14 deletions processor/intervalprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,29 @@ import (
func TestAggregation(t *testing.T) {
t.Parallel()

testCases := []string{
"basic_aggregation",
"non_monotonic_sums_are_passed_through",
"summaries_are_passed_through",
"histograms_are_aggregated",
"exp_histograms_are_aggregated",
"all_delta_metrics_are_passed_through",
testCases := []struct {
name string
passThrough bool
}{
{name: "basic_aggregation"},
{name: "histograms_are_aggregated"},
{name: "exp_histograms_are_aggregated"},
{name: "gauges_are_aggregated"},
{name: "summaries_are_aggregated"},
{name: "all_delta_metrics_are_passed_through"}, // Deltas are passed through even when aggregation is enabled
{name: "non_monotonic_sums_are_passed_through"}, // Non-monotonic sums are passed through even when aggregation is enabled
{name: "gauges_passes_through", passThrough: true},
{name: "summaries_passes_through", passThrough: true},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

config := &Config{Interval: time.Second}

var config *Config
for _, tc := range testCases {
testName := tc

t.Run(testName, func(t *testing.T) {
t.Parallel()
config = &Config{Interval: time.Second, GaugePassThrough: tc.passThrough, SummaryPassThrough: tc.passThrough}

t.Run(tc.name, func(t *testing.T) {
// next stores the results of the filter metric processor
next := &consumertest.MetricsSink{}

Expand All @@ -53,7 +56,7 @@ func TestAggregation(t *testing.T) {
)
require.NoError(t, err)

dir := filepath.Join("testdata", testName)
dir := filepath.Join("testdata", tc.name)

md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml"))
require.NoError(t, err)
Expand All @@ -75,6 +78,7 @@ func TestAggregation(t *testing.T) {
require.Empty(t, processor.numberLookup)
require.Empty(t, processor.histogramLookup)
require.Empty(t, processor.expHistogramLookup)
require.Empty(t, processor.summaryLookup)

// Exporting again should return nothing
processor.exportMetrics()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ resourceMetrics:
- key: aaa
value:
stringValue: bbb
# For interval processor point of view, only the last datapoint should be passed through.
- timeUnixNano: 80
asDouble: 178
attributes:
- key: aaa
value:
stringValue: bbb

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
resourceMetrics:
- schemaUrl: https://test-res-schema.com/schema
resource:
attributes:
- key: asdf
value:
stringValue: foo
scopeMetrics:
- schemaUrl: https://test-scope-schema.com/schema
scope:
name: MyTestInstrument
version: "1.2.3"
attributes:
- key: foo
value:
stringValue: bar
metrics:
- name: test.gauge
gauge:
aggregationTemporality: 2
dataPoints:
- timeUnixNano: 80
asDouble: 178
attributes:
- key: aaa
value:
stringValue: bbb

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
resourceMetrics:
- schemaUrl: https://test-res-schema.com/schema
resource:
attributes:
- key: asdf
value:
stringValue: foo
scopeMetrics:
- schemaUrl: https://test-scope-schema.com/schema
scope:
name: MyTestInstrument
version: "1.2.3"
attributes:
- key: foo
value:
stringValue: bar
metrics:
- name: test.gauge
gauge:
aggregationTemporality: 2
dataPoints:
- timeUnixNano: 50
asDouble: 345
attributes:
- key: aaa
value:
stringValue: bbb
- timeUnixNano: 20
asDouble: 258
attributes:
- key: aaa
value:
stringValue: bbb
# For interval processor point of view, only the last datapoint should be passed through.
- timeUnixNano: 80
asDouble: 178
attributes:
- key: aaa
value:
stringValue: bbb

Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ resourceMetrics:
- key: aaa
value:
stringValue: bbb

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
resourceMetrics: []

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ resourceMetrics:
- key: aaa
value:
stringValue: bbb
# Only last summary should pass through
- timeUnixNano: 80
quantileValues:
- quantile: 0.25
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
resourceMetrics: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
resourceMetrics:
- schemaUrl: https://test-res-schema.com/schema
resource:
attributes:
- key: asdf
value:
stringValue: foo
scopeMetrics:
- schemaUrl: https://test-scope-schema.com/schema
scope:
name: MyTestInstrument
version: "1.2.3"
attributes:
- key: foo
value:
stringValue: bar
metrics:
- name: summary.test
summary:
dataPoints:
- timeUnixNano: 80
quantileValues:
- quantile: 0.25
value: 80
- quantile: 0.5
value: 35
- quantile: 0.75
value: 90
- quantile: 0.95
value: 15
attributes:
- key: aaa
value:
stringValue: bbb

This file was deleted.

Loading
Loading