Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Histogram aggregator metric expiration #10520

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions plugins/aggregators/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package histogram
import (
"sort"
"strconv"
"time"

"github.com/influxdata/telegraf"
telegrafConfig "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/aggregators"
)

Expand All @@ -22,9 +24,10 @@ const bucketNegInf = "-Inf"

// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics
type HistogramAggregator struct {
Configs []config `toml:"config"`
ResetBuckets bool `toml:"reset"`
Cumulative bool `toml:"cumulative"`
Configs []config `toml:"config"`
ResetBuckets bool `toml:"reset"`
Cumulative bool `toml:"cumulative"`
ExpirationInterval telegrafConfig.Duration `toml:"expiration_interval"`

buckets bucketsByMetrics
cache map[uint64]metricHistogramCollection
Expand All @@ -51,6 +54,7 @@ type metricHistogramCollection struct {
histogramCollection map[string]counts
name string
tags map[string]string
expireTime time.Time
}

// counts is the number of hits in the bucket
Expand All @@ -63,6 +67,8 @@ type groupedByCountFields struct {
fieldsWithCount map[string]int64
}

var timeNow = time.Now

// NewHistogramAggregator creates new histogram aggregator
func NewHistogramAggregator() *HistogramAggregator {
h := &HistogramAggregator{
Expand Down Expand Up @@ -90,6 +96,10 @@ var sampleConfig = `
## Defaults to true.
cumulative = true

## Expiration interval for each histogram. The histogram will be expired if
## there are no changes in any buckets for this time interval. 0 == no expiration.
# expiration_interval = "0m"

## Example config that aggregates all fields of the metric.
# [[aggregators.histogram.config]]
# ## Right borders of buckets (with +Inf implicitly added).
Expand Down Expand Up @@ -119,6 +129,8 @@ func (h *HistogramAggregator) Description() string {

// Add adds new hit to the buckets
func (h *HistogramAggregator) Add(in telegraf.Metric) {
addTime := timeNow()

bucketsByField := make(map[string][]float64)
for field := range in.Fields() {
buckets := h.getBuckets(in.Name(), field)
Expand Down Expand Up @@ -151,6 +163,9 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) {
index := sort.SearchFloat64s(buckets, value)
agr.histogramCollection[field][index]++
}
if h.ExpirationInterval != 0 {
agr.expireTime = addTime.Add(time.Duration(h.ExpirationInterval))
}
}
}

Expand All @@ -160,8 +175,13 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) {
// Push returns histogram values for metrics
func (h *HistogramAggregator) Push(acc telegraf.Accumulator) {
metricsWithGroupedFields := []groupedByCountFields{}
now := timeNow()

for _, aggregate := range h.cache {
for id, aggregate := range h.cache {
if h.ExpirationInterval != 0 && now.After(aggregate.expireTime) {
delete(h.cache, id)
continue
}
for field, counts := range aggregate.histogramCollection {
h.groupFieldsByBuckets(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts)
}
Expand Down
37 changes: 37 additions & 0 deletions plugins/aggregators/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
telegrafConfig "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
Expand All @@ -17,10 +18,15 @@ type tags map[string]string

// NewTestHistogram creates new test histogram aggregation with specified config
func NewTestHistogram(cfg []config, reset bool, cumulative bool) telegraf.Aggregator {
return NewTestHistogramWithExpirationInterval(cfg, reset, cumulative, 0)
}

func NewTestHistogramWithExpirationInterval(cfg []config, reset bool, cumulative bool, expirationInterval telegrafConfig.Duration) telegraf.Aggregator {
htm := NewHistogramAggregator()
htm.Configs = cfg
htm.ResetBuckets = reset
htm.Cumulative = cumulative
htm.ExpirationInterval = expirationInterval

return htm
}
Expand Down Expand Up @@ -244,6 +250,37 @@ func TestWrongBucketsOrder(t *testing.T) {
histogram.Add(firstMetric2)
}

// TestHistogram tests two metrics getting added and metric expiration
func TestHistogramMetricExpiration(t *testing.T) {
currentTime := time.Unix(10, 0)
timeNow = func() time.Time {
return currentTime
}
defer func() {
timeNow = time.Now
}()

var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
histogram := NewTestHistogramWithExpirationInterval(cfg, false, true, telegrafConfig.Duration(30))

acc := &testutil.Accumulator{}

histogram.Add(firstMetric1)
currentTime = time.Unix(41, 0)
histogram.Add(secondMetric)
histogram.Push(acc)

require.Len(t, acc.Metrics, 6, "Incorrect number of metrics")
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "0"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "4"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "10"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "23"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "30"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(1), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: bucketPosInf})
}

// assertContainsTaggedField is help functions to test histogram data
func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, fields map[string]interface{}, tags map[string]string) {
acc.Lock()
Expand Down