Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add merge aggregator #6410

Merged
merged 1 commit into from
Sep 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/aggregators/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/aggregators/basicstats"
_ "github.com/influxdata/telegraf/plugins/aggregators/final"
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram"
_ "github.com/influxdata/telegraf/plugins/aggregators/merge"
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
_ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter"
)
23 changes: 23 additions & 0 deletions plugins/aggregators/merge/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Merge Aggregator

Merge metrics together into a metric with multiple fields into the most memory
and network transfer efficient form.

Use this plugin when fields are split over multiple metrics, with the same
measurement, tag set and timestamp. By merging into a single metric they can
be handled more efficiently by the output.

### Configuration

```toml
[[aggregators.merge]]
# no configuration
```

### Example

```diff
- cpu,host=localhost usage_time=42 1567562620000000000
- cpu,host=localhost idle_time=42 1567562620000000000
+ cpu,host=localhost idle_time=42,usage_time=42 1567562620000000000
```
62 changes: 62 additions & 0 deletions plugins/aggregators/merge/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package seriesgrouper

import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/aggregators"
)

const (
description = "Merge metrics into multifield metrics by series key"
sampleConfig = ""
)

type Merge struct {
grouper *metric.SeriesGrouper
log telegraf.Logger
}

func (a *Merge) Init() error {
a.grouper = metric.NewSeriesGrouper()
return nil
}

func (a *Merge) Description() string {
return description
}

func (a *Merge) SampleConfig() string {
return sampleConfig
}

func (a *Merge) Add(m telegraf.Metric) {
tags := m.Tags()
for _, field := range m.FieldList() {
err := a.grouper.Add(m.Name(), tags, m.Time(), field.Key, field.Value)
if err != nil {
a.log.Errorf("Error adding metric: %v", err)
}
}
}

func (a *Merge) Push(acc telegraf.Accumulator) {
// Always use nanosecond precision to avoid rounding metrics that were
// produced at a precision higher than the agent default.
acc.SetPrecision(time.Nanosecond)

for _, m := range a.grouper.Metrics() {
acc.AddMetric(m)
}
}

func (a *Merge) Reset() {
a.grouper = metric.NewSeriesGrouper()
}

func init() {
aggregators.Add("merge", func() telegraf.Aggregator {
return &Merge{}
})
}
186 changes: 186 additions & 0 deletions plugins/aggregators/merge/merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package seriesgrouper

import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func TestSimple(t *testing.T) {
plugin := &Merge{}

err := plugin.Init()
require.NoError(t, err)

plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
)
require.NoError(t, err)

plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_guest": 42,
},
time.Unix(0, 0),
),
)
require.NoError(t, err)

var acc testutil.Accumulator
plugin.Push(&acc)

expected := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
"time_guest": 42,
},
time.Unix(0, 0),
),
}

testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}

func TestNanosecondPrecision(t *testing.T) {
plugin := &Merge{}

err := plugin.Init()
require.NoError(t, err)

plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 1),
),
)
require.NoError(t, err)

plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_guest": 42,
},
time.Unix(0, 1),
),
)
require.NoError(t, err)

var acc testutil.Accumulator
acc.SetPrecision(time.Second)
plugin.Push(&acc)

expected := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
"time_guest": 42,
},
time.Unix(0, 1),
),
}

testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}

func TestReset(t *testing.T) {
plugin := &Merge{}

err := plugin.Init()
require.NoError(t, err)

plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
)
require.NoError(t, err)

var acc testutil.Accumulator
plugin.Push(&acc)

plugin.Reset()

plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_guest": 42,
},
time.Unix(0, 0),
),
)
require.NoError(t, err)

plugin.Push(&acc)

expected := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_guest": 42,
},
time.Unix(0, 0),
),
}

testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}