Skip to content

Commit

Permalink
Support Processor & Aggregator Plugins
Browse files Browse the repository at this point in the history
closes #1726
  • Loading branch information
sparrc committed Sep 19, 2016
1 parent 6648c10 commit 75facef
Show file tree
Hide file tree
Showing 25 changed files with 1,598 additions and 754 deletions.
12 changes: 3 additions & 9 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package telegraf

import "time"

// Accumulator is an interface for "accumulating" metrics from input plugin(s).
// The metrics are sent down a channel shared between all input plugins and then
// flushed on the configured flush_interval.
// Accumulator is an interface for "accumulating" metrics from plugin(s).
// The metrics are sent down a channel shared between all plugins.
type Accumulator interface {
// AddFields adds a metric to the accumulator with the given measurement
// name, fields, and tags (and timestamp). If a timestamp is not provided,
Expand All @@ -29,12 +28,7 @@ type Accumulator interface {
tags map[string]string,
t ...time.Time)

AddError(err error)

Debug() bool
SetDebug(enabled bool)

SetPrecision(precision, interval time.Duration)

DisablePrecision()
AddError(err error)
}
179 changes: 29 additions & 150 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,40 @@
package agent

import (
"fmt"
"log"
"math"
"sync/atomic"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/models"
)

type MetricMaker interface {
Name() string
MakeMetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
mType telegraf.ValueType,
t time.Time,
) telegraf.Metric
}

func NewAccumulator(
inputConfig *models.InputConfig,
maker MetricMaker,
metrics chan telegraf.Metric,
) *accumulator {
acc := accumulator{}
acc.metrics = metrics
acc.inputConfig = inputConfig
acc.precision = time.Nanosecond
acc := accumulator{
maker: maker,
metrics: metrics,
precision: time.Nanosecond,
}
return &acc
}

type accumulator struct {
metrics chan telegraf.Metric

defaultTags map[string]string

debug bool
// print every point added to the accumulator
trace bool

inputConfig *models.InputConfig
maker MetricMaker

precision time.Duration

Expand All @@ -44,7 +47,7 @@ func (ac *accumulator) AddFields(
tags map[string]string,
t ...time.Time,
) {
if m := ac.makeMetric(measurement, fields, tags, telegraf.Untyped, t...); m != nil {
if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Untyped, ac.getTime(t)); m != nil {
ac.metrics <- m
}
}
Expand All @@ -55,7 +58,7 @@ func (ac *accumulator) AddGauge(
tags map[string]string,
t ...time.Time,
) {
if m := ac.makeMetric(measurement, fields, tags, telegraf.Gauge, t...); m != nil {
if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Gauge, ac.getTime(t)); m != nil {
ac.metrics <- m
}
}
Expand All @@ -66,114 +69,11 @@ func (ac *accumulator) AddCounter(
tags map[string]string,
t ...time.Time,
) {
if m := ac.makeMetric(measurement, fields, tags, telegraf.Counter, t...); m != nil {
if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Counter, ac.getTime(t)); m != nil {
ac.metrics <- m
}
}

// makeMetric either returns a metric, or returns nil if the metric doesn't
// need to be created (because of filtering, an error, etc.)
func (ac *accumulator) makeMetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
mType telegraf.ValueType,
t ...time.Time,
) telegraf.Metric {
if len(fields) == 0 || len(measurement) == 0 {
return nil
}
if tags == nil {
tags = make(map[string]string)
}

// Override measurement name if set
if len(ac.inputConfig.NameOverride) != 0 {
measurement = ac.inputConfig.NameOverride
}
// Apply measurement prefix and suffix if set
if len(ac.inputConfig.MeasurementPrefix) != 0 {
measurement = ac.inputConfig.MeasurementPrefix + measurement
}
if len(ac.inputConfig.MeasurementSuffix) != 0 {
measurement = measurement + ac.inputConfig.MeasurementSuffix
}

// Apply plugin-wide tags if set
for k, v := range ac.inputConfig.Tags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
// Apply daemon-wide tags if set
for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}

// Apply the metric filter(s)
if ok := ac.inputConfig.Filter.Apply(measurement, fields, tags); !ok {
return nil
}

for k, v := range fields {
// Validate uint64 and float64 fields
switch val := v.(type) {
case uint64:
// InfluxDB does not support writing uint64
if val < uint64(9223372036854775808) {
fields[k] = int64(val)
} else {
fields[k] = int64(9223372036854775807)
}
continue
case float64:
// NaNs are invalid values in influxdb, skip measurement
if math.IsNaN(val) || math.IsInf(val, 0) {
if ac.debug {
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
"field, skipping",
measurement, k)
}
delete(fields, k)
continue
}
}

fields[k] = v
}

var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
} else {
timestamp = time.Now()
}
timestamp = timestamp.Round(ac.precision)

var m telegraf.Metric
var err error
switch mType {
case telegraf.Counter:
m, err = telegraf.NewCounterMetric(measurement, tags, fields, timestamp)
case telegraf.Gauge:
m, err = telegraf.NewGaugeMetric(measurement, tags, fields, timestamp)
default:
m, err = telegraf.NewMetric(measurement, tags, fields, timestamp)
}
if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return nil
}

if ac.trace {
fmt.Println("> " + m.String())
}

return m
}

// AddError passes a runtime error to the accumulator.
// The error will be tagged with the plugin name and written to the log.
func (ac *accumulator) AddError(err error) {
Expand All @@ -182,23 +82,7 @@ func (ac *accumulator) AddError(err error) {
}
atomic.AddUint64(&ac.errCount, 1)
//TODO suppress/throttle consecutive duplicate errors?
log.Printf("ERROR in input [%s]: %s", ac.inputConfig.Name, err)
}

func (ac *accumulator) Debug() bool {
return ac.debug
}

func (ac *accumulator) SetDebug(debug bool) {
ac.debug = debug
}

func (ac *accumulator) Trace() bool {
return ac.trace
}

func (ac *accumulator) SetTrace(trace bool) {
ac.trace = trace
log.Printf("ERROR in plugin [%s]: %s", ac.maker.Name(), err)
}

// SetPrecision takes two time.Duration objects. If the first is non-zero,
Expand All @@ -222,17 +106,12 @@ func (ac *accumulator) SetPrecision(precision, interval time.Duration) {
}
}

func (ac *accumulator) DisablePrecision() {
ac.precision = time.Nanosecond
}

func (ac *accumulator) setDefaultTags(tags map[string]string) {
ac.defaultTags = tags
}

func (ac *accumulator) addDefaultTag(key, value string) {
if ac.defaultTags == nil {
ac.defaultTags = make(map[string]string)
func (ac accumulator) getTime(t []time.Time) time.Time {
var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
} else {
timestamp = time.Now()
}
ac.defaultTags[key] = value
return timestamp.Round(ac.precision)
}
Loading

0 comments on commit 75facef

Please sign in to comment.