Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/interval] Update config structure for interval processor #34926

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions processor/intervalprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,17 @@ The following metric types will *not* be aggregated, and will instead be passed,

The following settings can be optionally configured:

* `interval`: The interval in which the processor should export the aggregated metrics. Default: 60s
* `gauge_pass_through`: Whether gauges should pass through as they are to the next component or be aggregated. Default: false
* `summary_pass_through`: Whether summaries should pass through as they are to the next component or be aggregated. Default: false
```yaml
intervalprocessor:
# The interval in which the processor should export the aggregated metrics.
[ interval: <duration> | default = 60s ]

pass_through:
# Whether gauges should be aggregated or passed through to the next component as they are
[ gauge: <bool> | default = false ]
# Whether summaries should be aggregated or passed through to the next component as they are
[ summary: <boo>l | default = false ]
```

## Example of metric flows

Expand Down
14 changes: 10 additions & 4 deletions processor/intervalprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ var _ component.Config = (*Config)(nil)
type Config struct {
// Interval is the time interval at which the processor will aggregate metrics.
Interval time.Duration `mapstructure:"interval"`
// GaugePassThrough is a flag that determines whether gauge metrics should be passed through
// PassThrough is a configuration that determines whether gauge and summary metrics should be passed through
// as they are or aggregated.
GaugePassThrough bool `mapstructure:"gauge_pass_through"`
// SummaryPassThrough is a flag that determines whether summary metrics should be passed through
PassThrough PassThrough `mapstructure:"pass_through"`
}

type PassThrough struct {
// Gauge is a flag that determines whether gauge metrics should be passed through
// as they are or aggregated.
Gauge bool `mapstructure:"gauge"`
// Summary is a flag that determines whether summary metrics should be passed through
// as they are or aggregated.
SummaryPassThrough bool `mapstructure:"summary_pass_through"`
Summary bool `mapstructure:"summary"`
}

// Validate checks whether the input configuration has all of the required fields for the processor.
Expand Down
8 changes: 5 additions & 3 deletions processor/intervalprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ func NewFactory() processor.Factory {

func createDefaultConfig() component.Config {
return &Config{
Interval: 60 * time.Second,
GaugePassThrough: false,
SummaryPassThrough: false,
Interval: 60 * time.Second,
PassThrough: PassThrough{
Gauge: false,
Summary: false,
},
}
}

Expand Down
14 changes: 5 additions & 9 deletions processor/intervalprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ type Processor struct {
expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint
summaryLookup map[identity.Stream]pmetric.SummaryDataPoint

exportInterval time.Duration
gaugePassThrough bool
summaryPassThrough bool
config *Config

nextConsumer consumer.Metrics
}
Expand All @@ -64,16 +62,14 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics
expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{},
summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{},

exportInterval: config.Interval,
gaugePassThrough: config.GaugePassThrough,
summaryPassThrough: config.SummaryPassThrough,
config: config,

nextConsumer: nextConsumer,
}
}

func (p *Processor) Start(_ context.Context, _ component.Host) error {
exportTicker := time.NewTicker(p.exportInterval)
exportTicker := time.NewTicker(p.config.Interval)
go func() {
for {
select {
Expand Down Expand Up @@ -109,15 +105,15 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
sm.Metrics().RemoveIf(func(m pmetric.Metric) bool {
switch m.Type() {
case pmetric.MetricTypeSummary:
if p.summaryPassThrough {
if p.config.PassThrough.Summary {
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.summaryLookup)
return true
case pmetric.MetricTypeGauge:
if p.gaugePassThrough {
if p.config.PassThrough.Gauge {
return false
}

Expand Down
2 changes: 1 addition & 1 deletion processor/intervalprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestAggregation(t *testing.T) {

var config *Config
for _, tc := range testCases {
config = &Config{Interval: time.Second, GaugePassThrough: tc.passThrough, SummaryPassThrough: tc.passThrough}
config = &Config{Interval: time.Second, PassThrough: PassThrough{Gauge: tc.passThrough, Summary: tc.passThrough}}

t.Run(tc.name, func(t *testing.T) {
// next stores the results of the filter metric processor
Expand Down
Loading