Skip to content

Commit

Permalink
Implement telegraf collecting stats on itself
Browse files Browse the repository at this point in the history
closes #1348
  • Loading branch information
sparrc committed Nov 7, 2016
1 parent 5d3850c commit 44d3a78
Show file tree
Hide file tree
Showing 17 changed files with 420 additions and 125 deletions.
7 changes: 6 additions & 1 deletion accumulator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package telegraf

import "time"
import (
"time"
)

// Accumulator is an interface for "accumulating" metrics from plugin(s).
// The metrics are sent down a channel shared between all plugins.
Expand Down Expand Up @@ -28,6 +30,9 @@ type Accumulator interface {
tags map[string]string,
t ...time.Time)

// TODO document
AddMetrics(metrics []Metric)

SetPrecision(precision, interval time.Duration)

AddError(err error)
Expand Down
14 changes: 11 additions & 3 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package agent

import (
"log"
"sync/atomic"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

var (
NErrors = selfstat.Register("agent", "gather_errors", map[string]string{})
)

type MetricMaker interface {
Expand Down Expand Up @@ -37,8 +41,12 @@ type accumulator struct {
maker MetricMaker

precision time.Duration
}

errCount uint64
func (ac *accumulator) AddMetrics(metrics []telegraf.Metric) {
for _, m := range metrics {
ac.metrics <- m
}
}

func (ac *accumulator) AddFields(
Expand Down Expand Up @@ -80,7 +88,7 @@ func (ac *accumulator) AddError(err error) {
if err == nil {
return
}
atomic.AddUint64(&ac.errCount, 1)
NErrors.Incr(1)
//TODO suppress/throttle consecutive duplicate errors?
log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err)
}
Expand Down
5 changes: 0 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ func NewAgent(config *config.Config) (*Agent, error) {
// Connect connects to all configured outputs
func (a *Agent) Connect() error {
for _, o := range a.Config.Outputs {
o.Quiet = a.Config.Agent.Quiet

switch ot := o.Output.(type) {
case telegraf.ServiceOutput:
if err := ot.Start(); err != nil {
Expand Down Expand Up @@ -204,9 +202,6 @@ func (a *Agent) Test() error {
if err := input.Input.Gather(acc); err != nil {
return err
}
if acc.errCount > 0 {
return fmt.Errorf("Errors encountered during processing")
}

// Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
Expand Down
25 changes: 8 additions & 17 deletions internal/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ import (
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

var (
MetricsGathered = selfstat.Register("agent", "metrics_gathered", map[string]string{})
MetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{})
)

// Buffer is an object for storing metrics in a circular buffer.
type Buffer struct {
buf chan telegraf.Metric
// total dropped metrics
drops int
// total metrics added
total int

mu sync.Mutex
}
Expand All @@ -36,25 +38,14 @@ func (b *Buffer) Len() int {
return len(b.buf)
}

// Drops returns the total number of dropped metrics that have occured in this
// buffer since instantiation.
func (b *Buffer) Drops() int {
return b.drops
}

// Total returns the total number of metrics that have been added to this buffer.
func (b *Buffer) Total() int {
return b.total
}

// Add adds metrics to the buffer.
func (b *Buffer) Add(metrics ...telegraf.Metric) {
for i, _ := range metrics {
b.total++
MetricsGathered.Incr(1)
select {
case b.buf <- metrics[i]:
default:
b.drops++
MetricsDropped.Incr(1)
<-b.buf
b.buf <- metrics[i]
}
Expand Down
34 changes: 20 additions & 14 deletions internal/buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,47 +27,53 @@ func BenchmarkAddMetrics(b *testing.B) {

func TestNewBufferBasicFuncs(t *testing.T) {
b := NewBuffer(10)
MetricsDropped.Set(0)
MetricsGathered.Set(0)

assert.True(t, b.IsEmpty())
assert.Zero(t, b.Len())
assert.Zero(t, b.Drops())
assert.Zero(t, b.Total())
assert.Zero(t, MetricsDropped.Get())
assert.Zero(t, MetricsGathered.Get())

m := testutil.TestMetric(1, "mymetric")
b.Add(m)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 1)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 1)
assert.Equal(t, int64(0), MetricsDropped.Get())
assert.Equal(t, int64(1), MetricsGathered.Get())

b.Add(metricList...)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 6)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 6)
assert.Equal(t, int64(0), MetricsDropped.Get())
assert.Equal(t, int64(6), MetricsGathered.Get())
}

func TestDroppingMetrics(t *testing.T) {
b := NewBuffer(10)
MetricsDropped.Set(0)
MetricsGathered.Set(0)

// Add up to the size of the buffer
b.Add(metricList...)
b.Add(metricList...)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 10)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 10)
assert.Equal(t, int64(0), MetricsDropped.Get())
assert.Equal(t, int64(10), MetricsGathered.Get())

// Add 5 more and verify they were dropped
b.Add(metricList...)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 10)
assert.Equal(t, b.Drops(), 5)
assert.Equal(t, b.Total(), 15)
assert.Equal(t, int64(5), MetricsDropped.Get())
assert.Equal(t, int64(15), MetricsGathered.Get())
}

func TestGettingBatches(t *testing.T) {
b := NewBuffer(20)
MetricsDropped.Set(0)
MetricsGathered.Set(0)

// Verify that the buffer returned is smaller than requested when there are
// not as many items as requested.
Expand All @@ -78,8 +84,8 @@ func TestGettingBatches(t *testing.T) {
// Verify that the buffer is now empty
assert.True(t, b.IsEmpty())
assert.Zero(t, b.Len())
assert.Zero(t, b.Drops())
assert.Equal(t, b.Total(), 5)
assert.Zero(t, MetricsDropped.Get())
assert.Equal(t, int64(5), MetricsGathered.Get())

// Verify that the buffer returned is not more than the size requested
b.Add(metricList...)
Expand All @@ -89,6 +95,6 @@ func TestGettingBatches(t *testing.T) {
// Verify that buffer is not empty
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 2)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 10)
assert.Equal(t, int64(0), MetricsDropped.Get())
assert.Equal(t, int64(10), MetricsGathered.Get())
}
5 changes: 1 addition & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,10 +821,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
return err
}

rp := &models.RunningInput{
Input: input,
Config: pluginConfig,
}
rp := models.NewRunningInput(input, pluginConfig)
c.Inputs = append(c.Inputs, rp)
return nil
}
Expand Down
15 changes: 15 additions & 0 deletions internal/models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

type RunningInput struct {
Expand All @@ -14,6 +15,19 @@ type RunningInput struct {
trace bool
debug bool
defaultTags map[string]string

MetricsGathered selfstat.Stat
}

func NewRunningInput(
input telegraf.Input,
config *InputConfig,
) *RunningInput {
return &RunningInput{
Input: input,
Config: config,
MetricsGathered: selfstat.Register("inputs", "metrics_gathered", map[string]string{"input": config.Name}),
}
}

// InputConfig containing a name, interval, and filter
Expand Down Expand Up @@ -60,6 +74,7 @@ func (r *RunningInput) MakeMetric(
fmt.Println("> " + m.String())
}

r.MetricsGathered.Incr(1)
return m
}

Expand Down
Loading

0 comments on commit 44d3a78

Please sign in to comment.