Skip to content

Commit

Permalink
Implement telegraf collecting stats on itself
Browse files Browse the repository at this point in the history
closes #1348
  • Loading branch information
sparrc committed Nov 7, 2016
1 parent 5d3850c commit 578154a
Show file tree
Hide file tree
Showing 13 changed files with 317 additions and 101 deletions.
7 changes: 6 additions & 1 deletion accumulator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package telegraf

import "time"
import (
"time"
)

// Accumulator is an interface for "accumulating" metrics from plugin(s).
// The metrics are sent down a channel shared between all plugins.
Expand Down Expand Up @@ -28,6 +30,9 @@ type Accumulator interface {
tags map[string]string,
t ...time.Time)

// TODO document
AddMetrics(metrics []Metric)

SetPrecision(precision, interval time.Duration)

AddError(err error)
Expand Down
14 changes: 11 additions & 3 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package agent

import (
"log"
"sync/atomic"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

var (
NErrors = selfstat.New("agent", "gather_errors", map[string]string{})
)

type MetricMaker interface {
Expand Down Expand Up @@ -37,8 +41,12 @@ type accumulator struct {
maker MetricMaker

precision time.Duration
}

errCount uint64
func (ac *accumulator) AddMetrics(metrics []telegraf.Metric) {
for _, m := range metrics {
ac.metrics <- m
}
}

func (ac *accumulator) AddFields(
Expand Down Expand Up @@ -80,7 +88,7 @@ func (ac *accumulator) AddError(err error) {
if err == nil {
return
}
atomic.AddUint64(&ac.errCount, 1)
NErrors.Incr(1)
//TODO suppress/throttle consecutive duplicate errors?
log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err)
}
Expand Down
3 changes: 0 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,6 @@ func (a *Agent) Test() error {
if err := input.Input.Gather(acc); err != nil {
return err
}
if acc.errCount > 0 {
return fmt.Errorf("Errors encountered during processing")
}

// Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
Expand Down
25 changes: 8 additions & 17 deletions internal/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ import (
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

var (
MetricsGathered = selfstat.New("agent", "metrics_gathered", map[string]string{})
MetricsDropped = selfstat.New("agent", "metrics_dropped", map[string]string{})
)

// Buffer is an object for storing metrics in a circular buffer.
type Buffer struct {
buf chan telegraf.Metric
// total dropped metrics
drops int
// total metrics added
total int

mu sync.Mutex
}
Expand All @@ -36,25 +38,14 @@ func (b *Buffer) Len() int {
return len(b.buf)
}

// Drops returns the total number of dropped metrics that have occured in this
// buffer since instantiation.
func (b *Buffer) Drops() int {
return b.drops
}

// Total returns the total number of metrics that have been added to this buffer.
func (b *Buffer) Total() int {
return b.total
}

// Add adds metrics to the buffer.
func (b *Buffer) Add(metrics ...telegraf.Metric) {
for i, _ := range metrics {
b.total++
select {
case b.buf <- metrics[i]:
MetricsGathered.Incr(1)
default:
b.drops++
MetricsDropped.Incr(1)
<-b.buf
b.buf <- metrics[i]
}
Expand Down
5 changes: 1 addition & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,10 +821,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
return err
}

rp := &models.RunningInput{
Input: input,
Config: pluginConfig,
}
rp := models.NewRunningInput(input, pluginConfig)
c.Inputs = append(c.Inputs, rp)
return nil
}
Expand Down
15 changes: 15 additions & 0 deletions internal/models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

type RunningInput struct {
Expand All @@ -14,6 +15,19 @@ type RunningInput struct {
trace bool
debug bool
defaultTags map[string]string

MetricsGathered selfstat.Stat
}

func NewRunningInput(
input telegraf.Input,
config *InputConfig,
) *RunningInput {
return &RunningInput{
Input: input,
Config: config,
MetricsGathered: selfstat.New("inputs", "metrics_gathered", map[string]string{"input": config.Name}),
}
}

// InputConfig containing a name, interval, and filter
Expand Down Expand Up @@ -60,6 +74,7 @@ func (r *RunningInput) MakeMetric(
fmt.Println("> " + m.String())
}

r.MetricsGathered.Incr(1)
return m
}

Expand Down
110 changes: 47 additions & 63 deletions internal/models/running_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ import (

func TestMakeMetricNoFields(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
},
}
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
})

m := ri.MakeMetric(
"RITest",
Expand All @@ -32,11 +30,9 @@ func TestMakeMetricNoFields(t *testing.T) {
// nil fields should get dropped
func TestMakeMetricNilFields(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
},
}
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
})

m := ri.MakeMetric(
"RITest",
Expand All @@ -58,11 +54,9 @@ func TestMakeMetricNilFields(t *testing.T) {
// make an untyped, counter, & gauge metric
func TestMakeMetric(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
},
}
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
})
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true)
Expand Down Expand Up @@ -126,14 +120,12 @@ func TestMakeMetric(t *testing.T) {

func TestMakeMetricWithPluginTags(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
Tags: map[string]string{
"foo": "bar",
},
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
Tags: map[string]string{
"foo": "bar",
},
}
})
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true)
Expand All @@ -155,15 +147,13 @@ func TestMakeMetricWithPluginTags(t *testing.T) {

func TestMakeMetricFilteredOut(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
Tags: map[string]string{
"foo": "bar",
},
Filter: Filter{NamePass: []string{"foobar"}},
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
Tags: map[string]string{
"foo": "bar",
},
}
Filter: Filter{NamePass: []string{"foobar"}},
})
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true)
Expand All @@ -182,11 +172,9 @@ func TestMakeMetricFilteredOut(t *testing.T) {

func TestMakeMetricWithDaemonTags(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
},
}
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
})
ri.SetDefaultTags(map[string]string{
"foo": "bar",
})
Expand Down Expand Up @@ -214,11 +202,9 @@ func TestMakeMetricInfFields(t *testing.T) {
inf := math.Inf(1)
ninf := math.Inf(-1)
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
},
}
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
})
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true)
Expand All @@ -244,11 +230,9 @@ func TestMakeMetricInfFields(t *testing.T) {

func TestMakeMetricAllFieldTypes(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
},
}
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
})
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true)
Expand Down Expand Up @@ -284,12 +268,10 @@ func TestMakeMetricAllFieldTypes(t *testing.T) {

func TestMakeMetricNameOverride(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
NameOverride: "foobar",
},
}
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
NameOverride: "foobar",
})

m := ri.MakeMetric(
"RITest",
Expand All @@ -307,12 +289,10 @@ func TestMakeMetricNameOverride(t *testing.T) {

func TestMakeMetricNamePrefix(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
MeasurementPrefix: "foobar_",
},
}
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
MeasurementPrefix: "foobar_",
})

m := ri.MakeMetric(
"RITest",
Expand All @@ -330,12 +310,10 @@ func TestMakeMetricNamePrefix(t *testing.T) {

func TestMakeMetricNameSuffix(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
MeasurementSuffix: "_foobar",
},
}
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
MeasurementSuffix: "_foobar",
})

m := ri.MakeMetric(
"RITest",
Expand All @@ -350,3 +328,9 @@ func TestMakeMetricNameSuffix(t *testing.T) {
fmt.Sprintf("RITest_foobar value=101i %d", now.UnixNano()),
)
}

type testInput struct{}

func (t *testInput) Description() string { return "" }
func (t *testInput) SampleConfig() string { return "" }
func (t *testInput) Gather(acc telegraf.Accumulator) error { return nil }
Loading

0 comments on commit 578154a

Please sign in to comment.