Skip to content

Commit

Permalink
Add option to disable renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsStegman committed Apr 30, 2024
1 parent 1e5fdb6 commit 5da8d4e
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 4 deletions.
3 changes: 3 additions & 0 deletions plugins/aggregators/final/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## aggregator and will not get sent to the output plugins.
# drop_original = false

## If true, _final is added to every field name
# rename_fields = true

## The time that a series is not updated until considering it final. Ignored
## when output_strategy is "periodic".
# series_timeout = "5m"
Expand Down
14 changes: 11 additions & 3 deletions plugins/aggregators/final/final.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var sampleConfig string
type Final struct {
OutputStrategy string `toml:"output_strategy"`
SeriesTimeout config.Duration `toml:"series_timeout"`
RenameFields bool `toml:"rename_fields"`

// The last metric for all series which are active
metricCache map[uint64]telegraf.Metric
Expand All @@ -25,6 +26,7 @@ type Final struct {
func NewFinal() *Final {
return &Final{
SeriesTimeout: config.Duration(5 * time.Minute),
RenameFields: true,
}
}

Expand Down Expand Up @@ -64,10 +66,16 @@ func (m *Final) Push(acc telegraf.Accumulator) {
// younger than that. So skip the output for this period.
continue
}
fields := map[string]interface{}{}
for _, field := range metric.FieldList() {
fields[field.Key+"_final"] = field.Value
var fields map[string]any
if m.RenameFields {
fields = map[string]interface{}{}
for _, field := range metric.FieldList() {
fields[field.Key+"_final"] = field.Value
}
} else {
fields = metric.Fields()
}

acc.AddFields(metric.Name(), fields, metric.Tags(), metric.Time())
delete(m.metricCache, id)
}
Expand Down
37 changes: 36 additions & 1 deletion plugins/aggregators/final/final_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func TestOutputStrategyTimeout(t *testing.T) {
final := &Final{
OutputStrategy: "timeout",
SeriesTimeout: config.Duration(30 * time.Second),
RenameFields: true,
}
require.NoError(t, final.Init())

Expand Down Expand Up @@ -215,6 +216,7 @@ func TestOutputStrategyPeriodic(t *testing.T) {
final := &Final{
OutputStrategy: "periodic",
SeriesTimeout: config.Duration(30 * time.Second),
RenameFields: true,
}
require.NoError(t, final.Init())

Expand Down Expand Up @@ -266,9 +268,42 @@ func TestOutputStrategyPeriodic(t *testing.T) {
metric.New(
"m",
tags,
map[string]interface{}{"a_final": int64(4)},
map[string]interface{}{
"a_final": 4,
},
now.Add(time.Second*-20),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.SortMetrics())
}

func TestRenameFieldsFalse(t *testing.T) {
final := &Final{
OutputStrategy: "periodic",
SeriesTimeout: config.Duration(30 * time.Second),
RenameFields: false,
}

require.NoError(t, final.Init())

now := time.Now()
tags := map[string]string{"foo": "bar"}
m1 := metric.New("m",
tags,
map[string]any{"a": 3},
now.Add(time.Second*-90))

var acc testutil.Accumulator
final.Add(m1)
final.Push(&acc)
expected := []telegraf.Metric{
metric.New(
"m",
tags,
map[string]any{"a": 3},
now.Add(time.Second*-90),
),
}

testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.SortMetrics())
}
3 changes: 3 additions & 0 deletions plugins/aggregators/final/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
## aggregator and will not get sent to the output plugins.
# drop_original = false

## If true, _final is added to every field name
# rename_fields = true

## The time that a series is not updated until considering it final. Ignored
## when output_strategy is "periodic".
# series_timeout = "5m"
Expand Down

0 comments on commit 5da8d4e

Please sign in to comment.