Skip to content

Commit

Permalink
feat: aggregator histogram add histogram expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
opengamer29 committed Jan 28, 2022
1 parent 10ef264 commit 49c4484
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 4 deletions.
40 changes: 36 additions & 4 deletions plugins/aggregators/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package histogram
import (
"sort"
"strconv"
"time"

"github.com/influxdata/telegraf"
telegrafConfig "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/aggregators"
)

Expand All @@ -22,14 +24,17 @@ const bucketNegInf = "-Inf"

// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics
type HistogramAggregator struct {
Configs []config `toml:"config"`
ResetBuckets bool `toml:"reset"`
Cumulative bool `toml:"cumulative"`
Configs []config `toml:"config"`
ResetBuckets bool `toml:"reset"`
Cumulative bool `toml:"cumulative"`
ExpirationInterval telegrafConfig.Duration `toml:"expiration_interval"`

buckets bucketsByMetrics
cache map[uint64]metricHistogramCollection
}

type timeFunc func() time.Time

// config is the config, which contains name, field of metric and histogram buckets.
type config struct {
Metric string `toml:"measurement_name"`
Expand All @@ -51,6 +56,7 @@ type metricHistogramCollection struct {
histogramCollection map[string]counts
name string
tags map[string]string
expireTime time.Time
}

// counts is the number of hits in the bucket
Expand All @@ -63,6 +69,8 @@ type groupedByCountFields struct {
fieldsWithCount map[string]int64
}

var timeNow timeFunc

// NewHistogramAggregator creates new histogram aggregator
func NewHistogramAggregator() *HistogramAggregator {
h := &HistogramAggregator{
Expand Down Expand Up @@ -90,6 +98,11 @@ var sampleConfig = `
## Defaults to true.
cumulative = true
## Expiration interval for each histogram. The histogram will be expired if
## there are no changes in any buckets for this time interval. 0 == no expiration.
## Defaults to 0.
expiration_interval = "5m"
## Example config that aggregates all fields of the metric.
# [[aggregators.histogram.config]]
# ## Right borders of buckets (with +Inf implicitly added).
Expand Down Expand Up @@ -119,6 +132,12 @@ func (h *HistogramAggregator) Description() string {

// Add adds new hit to the buckets
func (h *HistogramAggregator) Add(in telegraf.Metric) {
addTime := time.Time{}

if h.ExpirationInterval != 0 {
addTime = timeNow()
}

bucketsByField := make(map[string][]float64)
for field := range in.Fields() {
buckets := h.getBuckets(in.Name(), field)
Expand Down Expand Up @@ -151,6 +170,9 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) {
index := sort.SearchFloat64s(buckets, value)
agr.histogramCollection[field][index]++
}
if !addTime.IsZero() {
agr.expireTime = addTime.Add(time.Duration(h.ExpirationInterval))
}
}
}

Expand All @@ -160,8 +182,17 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) {
// Push returns histogram values for metrics
func (h *HistogramAggregator) Push(acc telegraf.Accumulator) {
metricsWithGroupedFields := []groupedByCountFields{}
now := time.Time{}

for _, aggregate := range h.cache {
if h.ExpirationInterval != 0 {
now = timeNow()
}

for id, aggregate := range h.cache {
if !now.IsZero() && now.After(aggregate.expireTime) {
delete(h.cache, id)
continue
}
for field, counts := range aggregate.histogramCollection {
h.groupFieldsByBuckets(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts)
}
Expand Down Expand Up @@ -345,4 +376,5 @@ func init() {
aggregators.Add("histogram", func() telegraf.Aggregator {
return NewHistogramAggregator()
})
timeNow = time.Now
}
37 changes: 37 additions & 0 deletions plugins/aggregators/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
telegrafConfig "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
Expand All @@ -17,10 +18,15 @@ type tags map[string]string

// NewTestHistogram creates new test histogram aggregation with specified config
func NewTestHistogram(cfg []config, reset bool, cumulative bool) telegraf.Aggregator {
return NewTestHistogramWithExpirationInterval(cfg, reset, cumulative, 0)
}

func NewTestHistogramWithExpirationInterval(cfg []config, reset bool, cumulative bool, expirationInterval telegrafConfig.Duration) telegraf.Aggregator {
htm := NewHistogramAggregator()
htm.Configs = cfg
htm.ResetBuckets = reset
htm.Cumulative = cumulative
htm.ExpirationInterval = expirationInterval

return htm
}
Expand Down Expand Up @@ -244,6 +250,37 @@ func TestWrongBucketsOrder(t *testing.T) {
histogram.Add(firstMetric2)
}

// TestHistogram tests two metrics getting added and metric expiration
func TestHistogramMetricExpiration(t *testing.T) {
currentTime := time.Unix(10, 0)
timeNow = func() time.Time {
return currentTime
}
defer func() {
timeNow = time.Now
}()

var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
histogram := NewTestHistogramWithExpirationInterval(cfg, false, true, telegrafConfig.Duration(30))

acc := &testutil.Accumulator{}

histogram.Add(firstMetric1)
currentTime = time.Unix(41, 0)
histogram.Add(secondMetric)
histogram.Push(acc)

require.Len(t, acc.Metrics, 6, "Incorrect number of metrics")
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "0"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "4"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "10"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "23"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "30"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(1), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: bucketPosInf})
}

// assertContainsTaggedField is help functions to test histogram data
func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, fields map[string]interface{}, tags map[string]string) {
acc.Lock()
Expand Down

0 comments on commit 49c4484

Please sign in to comment.