Skip to content

Commit

Permalink
Create public models for telegraf metrics, accumlator, plugins
Browse files Browse the repository at this point in the history
This will basically make the root directory a place for storing the
major telegraf interfaces, which will make telegraf's godoc looks quite
a bit nicer. And make it easier for contributors to lookup the few data
types that they actually care about.

closes #564
  • Loading branch information
sparrc committed Jan 27, 2016
1 parent a822d94 commit 9c0d14b
Show file tree
Hide file tree
Showing 83 changed files with 700 additions and 526 deletions.
193 changes: 13 additions & 180 deletions accumulator.go
Original file line number Diff line number Diff line change
@@ -1,188 +1,21 @@
package telegraf

import (
"fmt"
"log"
"math"
"sync"
"time"

"github.com/influxdata/telegraf/internal/models"

"github.com/influxdata/influxdb/client/v2"
)
import "time"

type Accumulator interface {
Add(measurement string, value interface{},
tags map[string]string, t ...time.Time)
AddFields(measurement string, fields map[string]interface{},
tags map[string]string, t ...time.Time)

SetDefaultTags(tags map[string]string)
AddDefaultTag(key, value string)

Prefix() string
SetPrefix(prefix string)
// Create a point with a value, decorating it with tags
// NOTE: tags is expected to be owned by the caller, don't mutate
// it after passing to Add.
Add(measurement string,
value interface{},
tags map[string]string,
t ...time.Time)

AddFields(measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time)

Debug() bool
SetDebug(enabled bool)
}

func NewAccumulator(
inputConfig *models.InputConfig,
points chan *client.Point,
) Accumulator {
acc := accumulator{}
acc.points = points
acc.inputConfig = inputConfig
return &acc
}

type accumulator struct {
sync.Mutex

points chan *client.Point

defaultTags map[string]string

debug bool

inputConfig *models.InputConfig

prefix string
}

func (ac *accumulator) Add(
measurement string,
value interface{},
tags map[string]string,
t ...time.Time,
) {
fields := make(map[string]interface{})
fields["value"] = value
ac.AddFields(measurement, fields, tags, t...)
}

func (ac *accumulator) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
if len(fields) == 0 || len(measurement) == 0 {
return
}

if !ac.inputConfig.Filter.ShouldTagsPass(tags) {
return
}

// Override measurement name if set
if len(ac.inputConfig.NameOverride) != 0 {
measurement = ac.inputConfig.NameOverride
}
// Apply measurement prefix and suffix if set
if len(ac.inputConfig.MeasurementPrefix) != 0 {
measurement = ac.inputConfig.MeasurementPrefix + measurement
}
if len(ac.inputConfig.MeasurementSuffix) != 0 {
measurement = measurement + ac.inputConfig.MeasurementSuffix
}

if tags == nil {
tags = make(map[string]string)
}
// Apply plugin-wide tags if set
for k, v := range ac.inputConfig.Tags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
// Apply daemon-wide tags if set
for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}

result := make(map[string]interface{})
for k, v := range fields {
// Filter out any filtered fields
if ac.inputConfig != nil {
if !ac.inputConfig.Filter.ShouldPass(k) {
continue
}
}
result[k] = v

// Validate uint64 and float64 fields
switch val := v.(type) {
case uint64:
// InfluxDB does not support writing uint64
if val < uint64(9223372036854775808) {
result[k] = int64(val)
} else {
result[k] = int64(9223372036854775807)
}
case float64:
// NaNs are invalid values in influxdb, skip measurement
if math.IsNaN(val) || math.IsInf(val, 0) {
if ac.debug {
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
"field, skipping",
measurement, k)
}
continue
}
}
}
fields = nil
if len(result) == 0 {
return
}

var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
} else {
timestamp = time.Now()
}

if ac.prefix != "" {
measurement = ac.prefix + measurement
}

pt, err := client.NewPoint(measurement, tags, result, timestamp)
if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return
}
if ac.debug {
fmt.Println("> " + pt.String())
}
ac.points <- pt
}

func (ac *accumulator) SetDefaultTags(tags map[string]string) {
ac.defaultTags = tags
}

func (ac *accumulator) AddDefaultTag(key, value string) {
ac.defaultTags[key] = value
}

func (ac *accumulator) Prefix() string {
return ac.prefix
}

func (ac *accumulator) SetPrefix(prefix string) {
ac.prefix = prefix
}

func (ac *accumulator) Debug() bool {
return ac.debug
}

func (ac *accumulator) SetDebug(debug bool) {
ac.debug = debug
}
164 changes: 164 additions & 0 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package agent

import (
"fmt"
"log"
"math"
"sync"
"time"

"github.com/influxdata/telegraf/internal/models"

"github.com/influxdata/influxdb/client/v2"
)

func NewAccumulator(
inputConfig *internal_models.InputConfig,
points chan *client.Point,
) *accumulator {
acc := accumulator{}
acc.points = points
acc.inputConfig = inputConfig
return &acc
}

type accumulator struct {
sync.Mutex

points chan *client.Point

defaultTags map[string]string

debug bool

inputConfig *internal_models.InputConfig

prefix string
}

func (ac *accumulator) Add(
measurement string,
value interface{},
tags map[string]string,
t ...time.Time,
) {
fields := make(map[string]interface{})
fields["value"] = value
ac.AddFields(measurement, fields, tags, t...)
}

func (ac *accumulator) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
if len(fields) == 0 || len(measurement) == 0 {
return
}

if !ac.inputConfig.Filter.ShouldTagsPass(tags) {
return
}

// Override measurement name if set
if len(ac.inputConfig.NameOverride) != 0 {
measurement = ac.inputConfig.NameOverride
}
// Apply measurement prefix and suffix if set
if len(ac.inputConfig.MeasurementPrefix) != 0 {
measurement = ac.inputConfig.MeasurementPrefix + measurement
}
if len(ac.inputConfig.MeasurementSuffix) != 0 {
measurement = measurement + ac.inputConfig.MeasurementSuffix
}

if tags == nil {
tags = make(map[string]string)
}
// Apply plugin-wide tags if set
for k, v := range ac.inputConfig.Tags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
// Apply daemon-wide tags if set
for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}

result := make(map[string]interface{})
for k, v := range fields {
// Filter out any filtered fields
if ac.inputConfig != nil {
if !ac.inputConfig.Filter.ShouldPass(k) {
continue
}
}
result[k] = v

// Validate uint64 and float64 fields
switch val := v.(type) {
case uint64:
// InfluxDB does not support writing uint64
if val < uint64(9223372036854775808) {
result[k] = int64(val)
} else {
result[k] = int64(9223372036854775807)
}
case float64:
// NaNs are invalid values in influxdb, skip measurement
if math.IsNaN(val) || math.IsInf(val, 0) {
if ac.debug {
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
"field, skipping",
measurement, k)
}
continue
}
}
}
fields = nil
if len(result) == 0 {
return
}

var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
} else {
timestamp = time.Now()
}

if ac.prefix != "" {
measurement = ac.prefix + measurement
}

pt, err := client.NewPoint(measurement, tags, result, timestamp)
if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return
}
if ac.debug {
fmt.Println("> " + pt.String())
}
ac.points <- pt
}

func (ac *accumulator) Debug() bool {
return ac.debug
}

func (ac *accumulator) SetDebug(debug bool) {
ac.debug = debug
}

func (ac *accumulator) setDefaultTags(tags map[string]string) {
ac.defaultTags = tags
}

func (ac *accumulator) addDefaultTag(key, value string) {
ac.defaultTags[key] = value
}
Loading

0 comments on commit 9c0d14b

Please sign in to comment.