Skip to content

Commit

Permalink
Throughout telegraf, use telegraf.Metric rather than client.Point
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Jan 28, 2016
1 parent 9c0d14b commit c549ab9
Show file tree
Hide file tree
Showing 52 changed files with 391 additions and 437 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
### Release Notes

### Features
- [#564](https://github.com/influxdata/telegraf/issues/564): features for plugin writing simplification. Internal metric data type.

### Bugfixes
- [#599](https://github.com/influxdata/telegraf/issues/599): datadog plugin tags not working.

## v0.10.1 [2016-01-27]

Expand Down
22 changes: 14 additions & 8 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ and submit new inputs.

### Input Plugin Guidelines

* A plugin must conform to the `inputs.Input` interface.
* A plugin must conform to the `telegraf.Input` interface.
* Input Plugins should call `inputs.Add` in their `init` function to register themselves.
See below for a quick example.
* Input Plugins must be added to the
Expand Down Expand Up @@ -97,7 +97,10 @@ package simple

// simple.go

import "github.com/influxdata/telegraf/plugins/inputs"
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)

type Simple struct {
Ok bool
Expand All @@ -122,7 +125,7 @@ func (s *Simple) Gather(acc inputs.Accumulator) error {
}

func init() {
inputs.Add("simple", func() inputs.Input { return &Simple{} })
inputs.Add("simple", func() telegraf.Input { return &Simple{} })
}
```

Expand Down Expand Up @@ -182,7 +185,7 @@ type Output interface {
Close() error
Description() string
SampleConfig() string
Write(points []*client.Point) error
Write(metrics []telegraf.Metric) error
}
```

Expand All @@ -193,7 +196,10 @@ package simpleoutput

// simpleoutput.go

import "github.com/influxdata/telegraf/plugins/outputs"
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
)

type Simple struct {
Ok bool
Expand All @@ -217,15 +223,15 @@ func (s *Simple) Close() error {
return nil
}

func (s *Simple) Write(points []*client.Point) error {
func (s *Simple) Write(metrics []telegraf.Metric) error {
for _, pt := range points {
// write `pt` to the output sink here
}
return nil
}

func init() {
outputs.Add("simpleoutput", func() outputs.Output { return &Simple{} })
outputs.Add("simpleoutput", func() telegraf.Output { return &Simple{} })
}

```
Expand Down Expand Up @@ -253,7 +259,7 @@ type ServiceOutput interface {
Close() error
Description() string
SampleConfig() string
Write(points []*client.Point) error
Write(metrics []telegraf.Metric) error
Start() error
Stop()
}
Expand Down
15 changes: 7 additions & 8 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,24 @@ import (
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/models"

"github.com/influxdata/influxdb/client/v2"
)

func NewAccumulator(
inputConfig *internal_models.InputConfig,
points chan *client.Point,
metrics chan telegraf.Metric,
) *accumulator {
acc := accumulator{}
acc.points = points
acc.metrics = metrics
acc.inputConfig = inputConfig
return &acc
}

type accumulator struct {
sync.Mutex

points chan *client.Point
metrics chan telegraf.Metric

defaultTags map[string]string

Expand Down Expand Up @@ -136,15 +135,15 @@ func (ac *accumulator) AddFields(
measurement = ac.prefix + measurement
}

pt, err := client.NewPoint(measurement, tags, result, timestamp)
m, err := telegraf.NewMetric(measurement, tags, result, timestamp)
if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return
}
if ac.debug {
fmt.Println("> " + pt.String())
fmt.Println("> " + m.String())
}
ac.points <- pt
ac.metrics <- m
}

func (ac *accumulator) Debug() bool {
Expand Down
30 changes: 14 additions & 16 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/models"

"github.com/influxdata/influxdb/client/v2"
)

// Agent runs telegraf and collects data based on the given config
Expand Down Expand Up @@ -101,7 +99,7 @@ func panicRecover(input *internal_models.RunningInput) {

// gatherParallel runs the inputs that are using the same reporting interval
// as the telegraf agent.
func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
func (a *Agent) gatherParallel(metricC chan telegraf.Metric) error {
var wg sync.WaitGroup

start := time.Now()
Expand All @@ -118,7 +116,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
defer panicRecover(input)
defer wg.Done()

acc := NewAccumulator(input.Config, pointChan)
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)

Expand Down Expand Up @@ -159,7 +157,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
func (a *Agent) gatherSeparate(
shutdown chan struct{},
input *internal_models.RunningInput,
pointChan chan *client.Point,
metricC chan telegraf.Metric,
) error {
defer panicRecover(input)

Expand All @@ -169,7 +167,7 @@ func (a *Agent) gatherSeparate(
var outerr error
start := time.Now()

acc := NewAccumulator(input.Config, pointChan)
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)

Expand Down Expand Up @@ -201,13 +199,13 @@ func (a *Agent) gatherSeparate(
func (a *Agent) Test() error {
shutdown := make(chan struct{})
defer close(shutdown)
pointChan := make(chan *client.Point)
metricC := make(chan telegraf.Metric)

// dummy receiver for the point channel
go func() {
for {
select {
case <-pointChan:
case <-metricC:
// do nothing
case <-shutdown:
return
Expand All @@ -216,7 +214,7 @@ func (a *Agent) Test() error {
}()

for _, input := range a.Config.Inputs {
acc := NewAccumulator(input.Config, pointChan)
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(true)

fmt.Printf("* Plugin: %s, Collection 1\n", input.Name)
Expand Down Expand Up @@ -263,7 +261,7 @@ func (a *Agent) flush() {
}

// flusher monitors the points input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error {
func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error {
// Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 200)
Expand All @@ -278,9 +276,9 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er
return nil
case <-ticker.C:
a.flush()
case pt := <-pointChan:
case m := <-metricC:
for _, o := range a.Config.Outputs {
o.AddPoint(pt)
o.AddPoint(m)
}
}
}
Expand Down Expand Up @@ -321,7 +319,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)

// channel shared between all input threads for accumulating points
pointChan := make(chan *client.Point, 1000)
metricC := make(chan telegraf.Metric, 1000)

// Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval {
Expand All @@ -333,7 +331,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
wg.Add(1)
go func() {
defer wg.Done()
if err := a.flusher(shutdown, pointChan); err != nil {
if err := a.flusher(shutdown, metricC); err != nil {
log.Printf("Flusher routine failed, exiting: %s\n", err.Error())
close(shutdown)
}
Expand All @@ -358,7 +356,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
wg.Add(1)
go func(input *internal_models.RunningInput) {
defer wg.Done()
if err := a.gatherSeparate(shutdown, input, pointChan); err != nil {
if err := a.gatherSeparate(shutdown, input, metricC); err != nil {
log.Printf(err.Error())
}
}(input)
Expand All @@ -368,7 +366,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
defer wg.Wait()

for {
if err := a.gatherParallel(pointChan); err != nil {
if err := a.gatherParallel(metricC); err != nil {
log.Printf(err.Error())
}

Expand Down
6 changes: 3 additions & 3 deletions internal/models/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package internal_models
import (
"strings"

"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
)

Expand All @@ -24,8 +24,8 @@ type Filter struct {
IsActive bool
}

func (f Filter) ShouldPointPass(point *client.Point) bool {
if f.ShouldPass(point.Name()) && f.ShouldTagsPass(point.Tags()) {
func (f Filter) ShouldMetricPass(metric telegraf.Metric) bool {
if f.ShouldPass(metric.Name()) && f.ShouldTagsPass(metric.Tags()) {
return true
}
return false
Expand Down
24 changes: 11 additions & 13 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"time"

"github.com/influxdata/telegraf"

"github.com/influxdata/influxdb/client/v2"
)

const DEFAULT_POINT_BUFFER_LIMIT = 10000
Expand All @@ -18,7 +16,7 @@ type RunningOutput struct {
Quiet bool
PointBufferLimit int

points []*client.Point
metrics []telegraf.Metric
overwriteCounter int
}

Expand All @@ -29,42 +27,42 @@ func NewRunningOutput(
) *RunningOutput {
ro := &RunningOutput{
Name: name,
points: make([]*client.Point, 0),
metrics: make([]telegraf.Metric, 0),
Output: output,
Config: conf,
PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT,
}
return ro
}

func (ro *RunningOutput) AddPoint(point *client.Point) {
func (ro *RunningOutput) AddPoint(point telegraf.Metric) {
if ro.Config.Filter.IsActive {
if !ro.Config.Filter.ShouldPointPass(point) {
if !ro.Config.Filter.ShouldMetricPass(point) {
return
}
}

if len(ro.points) < ro.PointBufferLimit {
ro.points = append(ro.points, point)
if len(ro.metrics) < ro.PointBufferLimit {
ro.metrics = append(ro.metrics, point)
} else {
if ro.overwriteCounter == len(ro.points) {
if ro.overwriteCounter == len(ro.metrics) {
ro.overwriteCounter = 0
}
ro.points[ro.overwriteCounter] = point
ro.metrics[ro.overwriteCounter] = point
ro.overwriteCounter++
}
}

func (ro *RunningOutput) Write() error {
start := time.Now()
err := ro.Output.Write(ro.points)
err := ro.Output.Write(ro.metrics)
elapsed := time.Since(start)
if err == nil {
if !ro.Quiet {
log.Printf("Wrote %d metrics to output %s in %s\n",
len(ro.points), ro.Name, elapsed)
len(ro.metrics), ro.Name, elapsed)
}
ro.points = make([]*client.Point, 0)
ro.metrics = make([]telegraf.Metric, 0)
ro.overwriteCounter = 0
}
return err
Expand Down
Loading

0 comments on commit c549ab9

Please sign in to comment.