Skip to content

Commit

Permalink
Add option to reset buckets on flush to histogram aggregator (influxd…
Browse files Browse the repository at this point in the history
  • Loading branch information
oplehto authored and Jean-Louis Dupond committed Apr 22, 2019
1 parent b498590 commit a2de71f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 12 deletions.
9 changes: 7 additions & 2 deletions plugins/aggregators/histogram/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ Values added to a bucket are also added to the larger buckets in the
distribution. This creates a [cumulative histogram](https://en.wikipedia.org/wiki/Histogram#/media/File:Cumulative_vs_normal_histogram.svg).

Like other Telegraf aggregators, the metric is emitted every `period` seconds.
Bucket counts however are not reset between periods and will be non-strictly
increasing while Telegraf is running.
By default bucket counts are not reset between periods and will be non-strictly
increasing while Telegraf is running. This behavior can be changed by setting the
`reset` parameter to true.

#### Design

Expand All @@ -34,6 +35,10 @@ of the algorithm which is implemented in the Prometheus
## aggregator and will not get sent to the output plugins.
drop_original = false

## If true, the histogram will be reset on flush instead
## of accumulating the results.
reset = false

## Example config that aggregates all fields of the metric.
# [[aggregators.histogram.config]]
# ## The set of buckets.
Expand Down
19 changes: 15 additions & 4 deletions plugins/aggregators/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ const bucketInf = "+Inf"

// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics
type HistogramAggregator struct {
Configs []config `toml:"config"`
Configs []config `toml:"config"`
ResetBuckets bool `toml:"reset"`

buckets bucketsByMetrics
cache map[uint64]metricHistogramCollection
Expand Down Expand Up @@ -72,6 +73,10 @@ var sampleConfig = `
## aggregator and will not get sent to the output plugins.
drop_original = false
## If true, the histogram will be reset on flush instead
## of accumulating the results.
reset = false
## Example config that aggregates all fields of the metric.
# [[aggregators.histogram.config]]
# ## The set of buckets.
Expand Down Expand Up @@ -201,9 +206,15 @@ func (h *HistogramAggregator) groupField(
)
}

// Reset does nothing, because we need to collect counts for a long time, otherwise if config parameter 'reset' has
// small value, we will get a histogram with a small amount of the distribution.
func (h *HistogramAggregator) Reset() {}
// Reset does nothing by default, because we typically need to collect counts for a long time.
// Otherwise if config parameter 'reset' has 'true' value, we will get a histogram
// with a small amount of the distribution. However in some use cases a reset is useful.
func (h *HistogramAggregator) Reset() {
if h.ResetBuckets {
h.resetCache()
h.buckets = make(bucketsByMetrics)
}
}

// resetCache resets cached counts(hits) in the buckets
func (h *HistogramAggregator) resetCache() {
Expand Down
37 changes: 31 additions & 6 deletions plugins/aggregators/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
)

// NewTestHistogram creates new test histogram aggregation with specified config
func NewTestHistogram(cfg []config) telegraf.Aggregator {
htm := &HistogramAggregator{Configs: cfg}
func NewTestHistogram(cfg []config, reset bool) telegraf.Aggregator {
htm := &HistogramAggregator{Configs: cfg, ResetBuckets: reset}
htm.buckets = make(bucketsByMetrics)
htm.resetCache()

Expand Down Expand Up @@ -69,11 +69,12 @@ func BenchmarkApply(b *testing.B) {
func TestHistogramWithPeriodAndOneField(t *testing.T) {
var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
histogram := NewTestHistogram(cfg)
histogram := NewTestHistogram(cfg, false)

acc := &testutil.Accumulator{}

histogram.Add(firstMetric1)
histogram.Reset()
histogram.Add(firstMetric2)
histogram.Push(acc)

Expand All @@ -88,12 +89,36 @@ func TestHistogramWithPeriodAndOneField(t *testing.T) {
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf)
}

// TestHistogramWithPeriodAndOneField tests metrics for one period and for one field
func TestHistogramWithReset(t *testing.T) {
var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
histogram := NewTestHistogram(cfg, true)

acc := &testutil.Accumulator{}

histogram.Add(firstMetric1)
histogram.Reset()
histogram.Add(firstMetric2)
histogram.Push(acc)

if len(acc.Metrics) != 6 {
assert.Fail(t, "Incorrect number of metrics")
}
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "0")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "10")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "20")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "30")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "40")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, bucketInf)
}

// TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields
func TestHistogramWithPeriodAndAllFields(t *testing.T) {
var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}})
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
histogram := NewTestHistogram(cfg)
histogram := NewTestHistogram(cfg, false)

acc := &testutil.Accumulator{}

Expand Down Expand Up @@ -127,7 +152,7 @@ func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) {

var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
histogram := NewTestHistogram(cfg)
histogram := NewTestHistogram(cfg, false)

acc := &testutil.Accumulator{}
histogram.Add(firstMetric1)
Expand Down Expand Up @@ -166,7 +191,7 @@ func TestWrongBucketsOrder(t *testing.T) {

var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}})
histogram := NewTestHistogram(cfg)
histogram := NewTestHistogram(cfg, false)
histogram.Add(firstMetric2)
}

Expand Down

0 comments on commit a2de71f

Please sign in to comment.