Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grace period for metrics late for aggregation #6049

Merged
merged 2 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ Parameters that can be used with any aggregator plugin:
how long for aggregators to wait before receiving metrics from input
plugins, in the case that aggregators are flushing and inputs are gathering
on the same interval.
- **grace**: The duration when the metrics will still be aggregated
by the plugin, even though they're outside of the aggregation period. This
is needed in a situation when the agent is expected to receive late metrics
and it's acceptable to roll them up into next aggregation period.
- **drop_original**: If true, the original metric will be dropped by the
aggregator and will not get sent to the output plugins.
- **name_override**: Override the base name of the measurement. (Default is
Expand Down
14 changes: 14 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
Name: name,
Delay: time.Millisecond * 100,
Period: time.Second * 30,
Grace: time.Second * 0,
}

if node, ok := tbl.Fields["period"]; ok {
Expand Down Expand Up @@ -1053,6 +1054,18 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
}
}

if node, ok := tbl.Fields["grace"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

conf.Grace = dur
}
}
}
if node, ok := tbl.Fields["drop_original"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
Expand Down Expand Up @@ -1100,6 +1113,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err

delete(tbl.Fields, "period")
delete(tbl.Fields, "delay")
delete(tbl.Fields, "grace")
delete(tbl.Fields, "drop_original")
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
Expand Down
7 changes: 4 additions & 3 deletions internal/models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type AggregatorConfig struct {
DropOriginal bool
Period time.Duration
Delay time.Duration
Grace time.Duration

NameOverride string
MeasurementPrefix string
Expand Down Expand Up @@ -135,9 +136,9 @@ func (r *RunningAggregator) Add(m telegraf.Metric) bool {
r.Lock()
defer r.Unlock()

if m.Time().Before(r.periodStart) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) {
log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s",
r.Name(), m.Time(), r.periodStart, r.periodEnd)
if m.Time().Before(r.periodStart.Add(-r.Config.Grace)) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) {
log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s g: %s",
r.Name(), m.Time(), r.periodStart, r.periodEnd, r.Config.Grace)
r.MetricsDropped.Incr(1)
return r.Config.DropOriginal
}
Expand Down
62 changes: 62 additions & 0 deletions internal/models/running_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,68 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
require.Equal(t, int64(101), acc.Metrics[0].Fields["sum"])
}

func TestAddMetricsOutsideCurrentPeriodWithGrace(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
Period: time.Millisecond * 1500,
Grace: time.Millisecond * 500,
})
require.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
now := time.Now()
ra.UpdateWindow(now, now.Add(ra.Config.Period))

m := testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now.Add(-time.Hour),
telegraf.Untyped,
)
require.False(t, ra.Add(m))

// metric before current period (late)
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(100),
},
now.Add(-time.Millisecond*1000),
telegraf.Untyped,
)
require.False(t, ra.Add(m))

// metric before current period, but within grace period (late)
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(102),
},
now.Add(-time.Millisecond*200),
telegraf.Untyped,
)
require.False(t, ra.Add(m))

// "now" metric
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(time.Millisecond*50),
telegraf.Untyped)
require.False(t, ra.Add(m))

ra.Push(&acc)
require.Equal(t, 1, len(acc.Metrics))
require.Equal(t, int64(203), acc.Metrics[0].Fields["sum"])
}

func TestAddAndPushOnePeriod(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Expand Down