Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agent): Add option to skip re-running processors after aggregators #14882

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (a *Agent) Run(ctx context.Context) error {
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
aggC := next
if len(a.Config.AggProcessors) != 0 {
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
aggC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err
Expand Down Expand Up @@ -1013,7 +1013,7 @@ func (a *Agent) runTest(ctx context.Context, wait time.Duration, outputC chan<-
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
procC := next
if len(a.Config.AggProcessors) != 0 {
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
procC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err
Expand Down Expand Up @@ -1112,7 +1112,7 @@ func (a *Agent) runOnce(ctx context.Context, wait time.Duration) error {
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
procC := next
if len(a.Config.AggProcessors) != 0 {
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
procC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err
Expand Down
4 changes: 1 addition & 3 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,7 @@ func TestCases(t *testing.T) {
// Process expected metrics and compare with resulting metrics
options := []cmp.Option{
testutil.IgnoreTags("host"),
}
if expected[0].Time().IsZero() {
options = append(options, testutil.IgnoreTime())
testutil.IgnoreTime(),
}
testutil.RequireMetricsEqual(t, expected, actual, options...)
})
Expand Down
2 changes: 2 additions & 0 deletions agent/testcases/aggregators-rerun-processors/expected.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
metric value=420
metric value_min=4200,value_max=4200
1 change: 1 addition & 0 deletions agent/testcases/aggregators-rerun-processors/input.influx
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
metric value=42.0
22 changes: 22 additions & 0 deletions agent/testcases/aggregators-rerun-processors/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Test for not skipping processors after running aggregators
[agent]
omit_hostname = true
skip_processors_after_aggregators = false

[[inputs.file]]
files = ["testcases/aggregators-rerun-processors/input.influx"]
data_format = "influx"

[[processors.starlark]]
source = '''
def apply(metric):
for k, v in metric.fields.items():
if type(v) == "float":
metric.fields[k] = v * 10
return metric
'''

[[aggregators.minmax]]
period = "1s"
drop_original = false

2 changes: 2 additions & 0 deletions agent/testcases/aggregators-skip-processors/expected.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
metric value=420
metric value_min=420,value_max=420
1 change: 1 addition & 0 deletions agent/testcases/aggregators-skip-processors/input.influx
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
metric value=42.0
22 changes: 22 additions & 0 deletions agent/testcases/aggregators-skip-processors/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Test for skipping processors after running aggregators
[agent]
omit_hostname = true
skip_processors_after_aggregators = true

[[inputs.file]]
files = ["testcases/aggregators-skip-processors/input.influx"]
data_format = "influx"

[[processors.starlark]]
source = '''
def apply(metric):
for k, v in metric.fields.items():
if type(v) == "float":
metric.fields[k] = v * 10
return metric
'''

[[aggregators.minmax]]
period = "1s"
drop_original = false

5 changes: 5 additions & 0 deletions cmd/telegraf/agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,8 @@
## stateful plugins on termination of Telegraf. If the file exists on start,
## the state in the file will be restored for the plugins.
# statefile = ""

## Flag to skip running processors after aggregators
## By default, processors are run a second time after aggregators. Changing
## this setting to true will skip the second run of processors.
# skip_processors_after_aggregators = false
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ type AgentConfig struct {
// Flag to always keep tags explicitly defined in the global tags section
// and ensure those tags always pass filtering.
AlwaysIncludeGlobalTags bool `toml:"always_include_global_tags"`

// Flag to skip running processors after aggregators
// By default, processors are run a second time after aggregators. Changing
// this setting to true will skip the second run of processors.
SkipProcessorsAfterAggregators bool `toml:"skip_processors_after_aggregators"`
}

// InputNames returns a list of strings of the configured inputs.
Expand Down
Loading