Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the histogram bucket bridge #3937

Merged
merged 16 commits into from
Aug 23, 2019
57 changes: 39 additions & 18 deletions pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,21 @@ var (
flushTimeStats = make(map[string]*Stats)
flushCountStats = make(map[string]*Stats)

aggregatorSeriesFlushed = expvar.Int{}
aggregatorSeriesFlushErrors = expvar.Int{}
aggregatorServiceCheckFlushErrors = expvar.Int{}
aggregatorServiceCheckFlushed = expvar.Int{}
aggregatorSketchesFlushErrors = expvar.Int{}
aggregatorSketchesFlushed = expvar.Int{}
aggregatorEventsFlushErrors = expvar.Int{}
aggregatorEventsFlushed = expvar.Int{}
aggregatorNumberOfFlush = expvar.Int{}
aggregatorDogstatsdMetricSample = expvar.Int{}
aggregatorChecksMetricSample = expvar.Int{}
aggregatorServiceCheck = expvar.Int{}
aggregatorEvent = expvar.Int{}
aggregatorHostnameUpdate = expvar.Int{}
aggregatorSeriesFlushed = expvar.Int{}
aggregatorSeriesFlushErrors = expvar.Int{}
aggregatorServiceCheckFlushErrors = expvar.Int{}
aggregatorServiceCheckFlushed = expvar.Int{}
aggregatorSketchesFlushErrors = expvar.Int{}
aggregatorSketchesFlushed = expvar.Int{}
aggregatorEventsFlushErrors = expvar.Int{}
aggregatorEventsFlushed = expvar.Int{}
aggregatorNumberOfFlush = expvar.Int{}
aggregatorDogstatsdMetricSample = expvar.Int{}
aggregatorChecksMetricSample = expvar.Int{}
aggregatorCheckHistogramBucketMetricSample = expvar.Int{}
aggregatorServiceCheck = expvar.Int{}
aggregatorEvent = expvar.Int{}
aggregatorHostnameUpdate = expvar.Int{}

// Hold series to be added to aggregated series on each flush
recurrentSeries metrics.Series
Expand Down Expand Up @@ -124,6 +125,7 @@ func init() {
aggregatorExpvars.Set("NumberOfFlush", &aggregatorNumberOfFlush)
aggregatorExpvars.Set("DogstatsdMetricSample", &aggregatorDogstatsdMetricSample)
aggregatorExpvars.Set("ChecksMetricSample", &aggregatorChecksMetricSample)
aggregatorExpvars.Set("ChecksHistogramBucketMetricSample", &aggregatorCheckHistogramBucketMetricSample)
aggregatorExpvars.Set("ServiceCheck", &aggregatorServiceCheck)
aggregatorExpvars.Set("Event", &aggregatorEvent)
aggregatorExpvars.Set("HostnameUpdate", &aggregatorHostnameUpdate)
Expand Down Expand Up @@ -161,7 +163,8 @@ type BufferedAggregator struct {
eventIn chan metrics.Event
serviceCheckIn chan metrics.ServiceCheck

checkMetricIn chan senderMetricSample
checkMetricIn chan senderMetricSample
checkHistogramBucketIn chan senderHistogramBucket

sampler TimeSampler
checkSamplers map[check.ID]*CheckSampler
Expand Down Expand Up @@ -189,7 +192,8 @@ func NewBufferedAggregator(s serializer.MetricSerializer, hostname, agentName st
serviceCheckIn: make(chan metrics.ServiceCheck, 100), // TODO make buffer size configurable
eventIn: make(chan metrics.Event, 100), // TODO make buffer size configurable

checkMetricIn: make(chan senderMetricSample, 100), // TODO make buffer size configurable
checkMetricIn: make(chan senderMetricSample, 100), // TODO make buffer size configurable
checkHistogramBucketIn: make(chan senderHistogramBucket, 100), // TODO make buffer size configurable

sampler: *NewTimeSampler(bucketSize),
checkSamplers: make(map[check.ID]*CheckSampler),
Expand Down Expand Up @@ -232,7 +236,7 @@ func AddRecurrentSeries(newSerie *metrics.Serie) {
// IsInputQueueEmpty returns true if every input channel for the aggregator are
// empty. This is mainly useful for tests and benchmark
func (agg *BufferedAggregator) IsInputQueueEmpty() bool {
if len(agg.checkMetricIn)+len(agg.serviceCheckIn)+len(agg.eventIn) == 0 {
if len(agg.checkMetricIn)+len(agg.serviceCheckIn)+len(agg.eventIn)+len(agg.checkHistogramBucketIn) == 0 {
return true
}
return false
Expand Down Expand Up @@ -310,6 +314,18 @@ func (agg *BufferedAggregator) handleSenderSample(ss senderMetricSample) {
}
}

func (agg *BufferedAggregator) handleSenderBucket(checkBucket senderHistogramBucket) {
agg.mu.Lock()
defer agg.mu.Unlock()

if checkSampler, ok := agg.checkSamplers[checkBucket.id]; ok {
checkBucket.bucket.Tags = deduplicateTags(checkBucket.bucket.Tags)
checkSampler.addBucket(checkBucket.bucket)
} else {
log.Debugf("CheckSampler with ID '%s' doesn't exist, can't handle histogram bucket", checkBucket.id)
}
}

// addServiceCheck adds the service check to the slice of current service checks
func (agg *BufferedAggregator) addServiceCheck(sc metrics.ServiceCheck) {
if sc.Ts == 0 {
Expand Down Expand Up @@ -342,7 +358,9 @@ func (agg *BufferedAggregator) GetSeriesAndSketches() (metrics.Series, metrics.S
series, sketches := agg.sampler.flush(timeNowNano())

for _, checkSampler := range agg.checkSamplers {
series = append(series, checkSampler.flush()...)
s, sk := checkSampler.flush()
series = append(series, s...)
sketches = append(sketches, sk...)
}
agg.mu.Unlock()
return series, sketches
Expand Down Expand Up @@ -553,6 +571,9 @@ func (agg *BufferedAggregator) run() {
case checkMetric := <-agg.checkMetricIn:
aggregatorChecksMetricSample.Add(1)
agg.handleSenderSample(checkMetric)
case checkHistogramBucket := <-agg.checkHistogramBucketIn:
aggregatorCheckHistogramBucketMetricSample.Add(1)
agg.handleSenderBucket(checkHistogramBucket)

case metric := <-agg.metricIn:
aggregatorDogstatsdMetricSample.Add(1)
Expand Down
152 changes: 142 additions & 10 deletions pkg/aggregator/check_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,41 @@
package aggregator

import (
"github.com/DataDog/datadog-agent/pkg/util/log"
"math"
"time"

"github.com/DataDog/datadog-agent/pkg/aggregator/ckey"
"github.com/DataDog/datadog-agent/pkg/metrics"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const checksSourceTypeName = "System"

// CheckSampler aggregates metrics from one Check instance
type CheckSampler struct {
series []*metrics.Serie
contextResolver *ContextResolver
metrics metrics.ContextMetrics
series []*metrics.Serie
sketches []metrics.SketchSeries
contextResolver *ContextResolver
metrics metrics.ContextMetrics
sketchMap sketchMap
lastBucketValue map[ckey.ContextKey]int
lastSeenBucket map[ckey.ContextKey]time.Time
bucketExpiry time.Duration
interpolationGranularity int
}

// newCheckSampler returns a newly initialized CheckSampler
func newCheckSampler() *CheckSampler {
return &CheckSampler{
series: make([]*metrics.Serie, 0),
contextResolver: newContextResolver(),
metrics: metrics.MakeContextMetrics(),
series: make([]*metrics.Serie, 0),
sketches: make([]metrics.SketchSeries, 0),
contextResolver: newContextResolver(),
metrics: metrics.MakeContextMetrics(),
sketchMap: make(sketchMap),
lastBucketValue: make(map[ckey.ContextKey]int),
lastSeenBucket: make(map[ckey.ContextKey]time.Time),
bucketExpiry: 1 * time.Minute,
interpolationGranularity: 1000,
}
}

Expand All @@ -37,7 +52,91 @@ func (cs *CheckSampler) addSample(metricSample *metrics.MetricSample) {
}
}

func (cs *CheckSampler) commit(timestamp float64) {
func (cs *CheckSampler) newSketchSeries(ck ckey.ContextKey, points []metrics.SketchPoint) metrics.SketchSeries {
ctx := cs.contextResolver.contextsByKey[ck]
ss := metrics.SketchSeries{
Name: ctx.Name,
Tags: ctx.Tags,
Host: ctx.Host,
// Interval: TODO: investigate
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbarciauskas I'm not sure how to handle this field in the context of checks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do checks run at a standard interval? I think it's only important for counts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbarciauskas they usually do (15 sec), but a custom check interval can be defined per check

Points: points,
ContextKey: ck,
}

return ss
}

func (cs *CheckSampler) addBucket(bucket *metrics.HistogramBucket) {
if bucket.Value < 0 {
log.Warnf("Negative bucket value %d for metric %s discarding", bucket.Value, bucket.Name)
return
}
if bucket.Value == 0 {
// noop
return
}
contextKey := cs.contextResolver.trackContext(bucket, bucket.Timestamp)

// if the bucket is monotonic and we already saw the bucket we only send the delta
mfpierre marked this conversation as resolved.
Show resolved Hide resolved
if bucket.Monotonic {
lastBucketValue, bucketFound := cs.lastBucketValue[contextKey]
rawValue := bucket.Value
if bucketFound {
cs.lastSeenBucket[contextKey] = time.Now()
bucket.Value = rawValue - lastBucketValue
}
cs.lastBucketValue[contextKey] = rawValue
cs.lastSeenBucket[contextKey] = time.Now()
}

if bucket.Value < 0 {
log.Warnf("Negative bucket value %d for metric %s discarding", bucket.Value, bucket.Name)
mfpierre marked this conversation as resolved.
Show resolved Hide resolved
return
}
if bucket.Value == 0 {
// noop
return
}

// simple linear interpolation, TODO: optimize
bucketRange := bucket.UpperBound - bucket.LowerBound
if bucketRange < 0 {
mfpierre marked this conversation as resolved.
Show resolved Hide resolved
log.Warnf(
"Negative bucket range [%f-%f] for metric %s discarding",
bucket.LowerBound, bucket.UpperBound, bucket.Name,
)
return
}
var linearIncr float64
var incrCount int
var countPerIncr uint
if bucket.Value > cs.interpolationGranularity {
linearIncr = bucketRange / float64(cs.interpolationGranularity)
countPerIncr = uint(bucket.Value / cs.interpolationGranularity)
incrCount = cs.interpolationGranularity
} else {
linearIncr = bucketRange / float64(bucket.Value)
countPerIncr = 1
incrCount = bucket.Value
}
if math.IsInf(bucket.UpperBound, 1) {
// We simulate the behavior of promQL for the infinity bucket:
// "if the quantile falls into the highest bucket, the upper bound of the 2nd highest bucket is returned"
incrCount = 1
countPerIncr = uint(bucket.Value)
}
currentVal := bucket.LowerBound
log.Tracef(
"Interpolating %d values by group of %d over the [%f-%f] bucket with %f increment",
bucket.Value, countPerIncr, bucket.LowerBound, bucket.UpperBound, linearIncr,
)
for i := 0; i < incrCount; i++ {
cs.sketchMap.insertN(int64(bucket.Timestamp), contextKey, currentVal, countPerIncr)
currentVal += linearIncr
}
}

func (cs *CheckSampler) commitSeries(timestamp float64) {
series, errors := cs.metrics.Flush(timestamp)
for ckey, err := range errors {
context, ok := cs.contextResolver.contextsByKey[ckey]
Expand All @@ -61,12 +160,45 @@ func (cs *CheckSampler) commit(timestamp float64) {

cs.series = append(cs.series, serie)
}
}

func (cs *CheckSampler) commitSketches(timestamp float64) {
pointsByCtx := make(map[ckey.ContextKey][]metrics.SketchPoint)

cs.sketchMap.flushBefore(int64(timestamp), func(ck ckey.ContextKey, p metrics.SketchPoint) {
if p.Sketch == nil {
return
}
pointsByCtx[ck] = append(pointsByCtx[ck], p)
})
for ck, points := range pointsByCtx {
cs.sketches = append(cs.sketches, cs.newSketchSeries(ck, points))
}
}

func (cs *CheckSampler) commit(timestamp float64) {
cs.commitSeries(timestamp)
cs.commitSketches(timestamp)
cs.contextResolver.expireContexts(timestamp - defaultExpiry)
}

func (cs *CheckSampler) flush() metrics.Series {
func (cs *CheckSampler) flush() (metrics.Series, metrics.SketchSeriesList) {
// series
series := cs.series
cs.series = make([]*metrics.Serie, 0)
return series

// sketches
sketches := cs.sketches
cs.sketches = make([]metrics.SketchSeries, 0)

// garbage collect unused bucket deltas
now := time.Now()
for ctxKey, lastSeenBucket := range cs.lastSeenBucket {
if now.Sub(lastSeenBucket) > cs.bucketExpiry {
delete(cs.lastSeenBucket, ctxKey)
delete(cs.lastBucketValue, ctxKey)
}
}

return series, sketches
}
42 changes: 42 additions & 0 deletions pkg/aggregator/check_sampler_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-2019 Datadog, Inc.

package aggregator

import (
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/aggregator/ckey"
"github.com/DataDog/datadog-agent/pkg/metrics"
)

func benchmarkAddBucket(bucketValue int, b *testing.B) {
checkSampler := newCheckSampler()

bucket := &metrics.HistogramBucket{
Name: "my.histogram",
Value: bucketValue,
LowerBound: 10.0,
UpperBound: 20.0,
Tags: []string{"foo", "bar"},
Timestamp: 12345.0,
}

for n := 0; n < b.N; n++ {
checkSampler.addBucket(bucket)
// reset bucket cache
checkSampler.lastBucketValue = make(map[ckey.ContextKey]int)
checkSampler.lastSeenBucket = make(map[ckey.ContextKey]time.Time)
}
}

func BenchmarkAddBucket1(b *testing.B) { benchmarkAddBucket(1, b) }
func BenchmarkAddBucket10(b *testing.B) { benchmarkAddBucket(10, b) }
func BenchmarkAddBucket100(b *testing.B) { benchmarkAddBucket(100, b) }
func BenchmarkAddBucket1000(b *testing.B) { benchmarkAddBucket(1000, b) }
func BenchmarkAddBucket10000(b *testing.B) { benchmarkAddBucket(10000, b) }
func BenchmarkAddBucket1000000(b *testing.B) { benchmarkAddBucket(1000000, b) }
func BenchmarkAddBucket10000000(b *testing.B) { benchmarkAddBucket(10000000, b) }
Loading