Skip to content

Commit

Permalink
Implement telegraf collecting stats on itself
Browse files Browse the repository at this point in the history
closes #1348
  • Loading branch information
sparrc committed Nov 8, 2016
1 parent 1c77157 commit 75498a9
Show file tree
Hide file tree
Showing 22 changed files with 505 additions and 167 deletions.
4 changes: 3 additions & 1 deletion accumulator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package telegraf

import "time"
import (
"time"
)

// Accumulator is an interface for "accumulating" metrics from plugin(s).
// The metrics are sent down a channel shared between all plugins.
Expand Down
10 changes: 6 additions & 4 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package agent

import (
"log"
"sync/atomic"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

var (
NErrors = selfstat.Register("self_agent", "gather_errors", map[string]string{})
)

type MetricMaker interface {
Expand Down Expand Up @@ -37,8 +41,6 @@ type accumulator struct {
maker MetricMaker

precision time.Duration

errCount uint64
}

func (ac *accumulator) AddFields(
Expand Down Expand Up @@ -80,7 +82,7 @@ func (ac *accumulator) AddError(err error) {
if err == nil {
return
}
atomic.AddUint64(&ac.errCount, 1)
NErrors.Incr(1)
//TODO suppress/throttle consecutive duplicate errors?
log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err)
}
Expand Down
2 changes: 1 addition & 1 deletion agent/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestAccAddError(t *testing.T) {
a.AddError(fmt.Errorf("baz"))

errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
assert.EqualValues(t, 3, a.errCount)
assert.EqualValues(t, int64(3), NErrors.Get())
require.Len(t, errs, 4) // 4 because of trailing newline
assert.Contains(t, string(errs[0]), "TestPlugin")
assert.Contains(t, string(errs[0]), "foo")
Expand Down
25 changes: 10 additions & 15 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/selfstat"
)

// Agent runs telegraf and collects data based on the given config
Expand Down Expand Up @@ -44,8 +45,6 @@ func NewAgent(config *config.Config) (*Agent, error) {
// Connect connects to all configured outputs
func (a *Agent) Connect() error {
for _, o := range a.Config.Outputs {
o.Quiet = a.Config.Agent.Quiet

switch ot := o.Output.(type) {
case telegraf.ServiceOutput:
if err := ot.Start(); err != nil {
Expand Down Expand Up @@ -106,24 +105,23 @@ func (a *Agent) gatherer(
) {
defer panicRecover(input)

GatherTime := selfstat.RegisterTiming("self_"+input.Config.Name, "gather_time_ns", map[string]string{})

acc := NewAccumulator(input, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
acc := NewAccumulator(input, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
input.SetDebug(a.Config.Agent.Debug)
input.SetDefaultTags(a.Config.Tags)

internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown)

start := time.Now()
gatherWithTimeout(shutdown, input, acc, interval)
elapsed := time.Since(start)

log.Printf("D! Input [%s] gathered metrics, (%s interval) in %s\n",
input.Name(), interval, elapsed)
GatherTime.Incr(elapsed.Nanoseconds())

select {
case <-shutdown:
Expand Down Expand Up @@ -204,9 +202,6 @@ func (a *Agent) Test() error {
if err := input.Input.Gather(acc); err != nil {
return err
}
if acc.errCount > 0 {
return fmt.Errorf("Errors encountered during processing")
}

// Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
Expand Down Expand Up @@ -323,17 +318,17 @@ func (a *Agent) Run(shutdown chan struct{}) error {
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)

// channel shared between all input threads for accumulating metrics
metricC := make(chan telegraf.Metric, 100)
metricC := make(chan telegraf.Metric, 1000)

// Start all ServicePlugins
for _, input := range a.Config.Inputs {
input.SetDefaultTags(a.Config.Tags)
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input, metricC)
// Service input plugins should set their own precision of their
// metrics.
acc.SetPrecision(time.Nanosecond, 0)
input.SetDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil {
log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
input.Name(), err.Error())
Expand Down
25 changes: 8 additions & 17 deletions internal/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ import (
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

var (
MetricsWritten = selfstat.Register("self_agent", "metrics_written", map[string]string{})
MetricsDropped = selfstat.Register("self_agent", "metrics_dropped", map[string]string{})
)

// Buffer is an object for storing metrics in a circular buffer.
type Buffer struct {
buf chan telegraf.Metric
// total dropped metrics
drops int
// total metrics added
total int

mu sync.Mutex
}
Expand All @@ -36,25 +38,14 @@ func (b *Buffer) Len() int {
return len(b.buf)
}

// Drops returns the total number of dropped metrics that have occured in this
// buffer since instantiation.
func (b *Buffer) Drops() int {
return b.drops
}

// Total returns the total number of metrics that have been added to this buffer.
func (b *Buffer) Total() int {
return b.total
}

// Add adds metrics to the buffer.
func (b *Buffer) Add(metrics ...telegraf.Metric) {
for i, _ := range metrics {
b.total++
MetricsWritten.Incr(1)
select {
case b.buf <- metrics[i]:
default:
b.drops++
MetricsDropped.Incr(1)
<-b.buf
b.buf <- metrics[i]
}
Expand Down
34 changes: 20 additions & 14 deletions internal/buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,47 +27,53 @@ func BenchmarkAddMetrics(b *testing.B) {

func TestNewBufferBasicFuncs(t *testing.T) {
b := NewBuffer(10)
MetricsDropped.Set(0)
MetricsGathered.Set(0)

assert.True(t, b.IsEmpty())
assert.Zero(t, b.Len())
assert.Zero(t, b.Drops())
assert.Zero(t, b.Total())
assert.Zero(t, MetricsDropped.Get())
assert.Zero(t, MetricsGathered.Get())

m := testutil.TestMetric(1, "mymetric")
b.Add(m)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 1)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 1)
assert.Equal(t, int64(0), MetricsDropped.Get())
assert.Equal(t, int64(1), MetricsGathered.Get())

b.Add(metricList...)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 6)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 6)
assert.Equal(t, int64(0), MetricsDropped.Get())
assert.Equal(t, int64(6), MetricsGathered.Get())
}

func TestDroppingMetrics(t *testing.T) {
b := NewBuffer(10)
MetricsDropped.Set(0)
MetricsGathered.Set(0)

// Add up to the size of the buffer
b.Add(metricList...)
b.Add(metricList...)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 10)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 10)
assert.Equal(t, int64(0), MetricsDropped.Get())
assert.Equal(t, int64(10), MetricsGathered.Get())

// Add 5 more and verify they were dropped
b.Add(metricList...)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 10)
assert.Equal(t, b.Drops(), 5)
assert.Equal(t, b.Total(), 15)
assert.Equal(t, int64(5), MetricsDropped.Get())
assert.Equal(t, int64(15), MetricsGathered.Get())
}

func TestGettingBatches(t *testing.T) {
b := NewBuffer(20)
MetricsDropped.Set(0)
MetricsGathered.Set(0)

// Verify that the buffer returned is smaller than requested when there are
// not as many items as requested.
Expand All @@ -78,8 +84,8 @@ func TestGettingBatches(t *testing.T) {
// Verify that the buffer is now empty
assert.True(t, b.IsEmpty())
assert.Zero(t, b.Len())
assert.Zero(t, b.Drops())
assert.Equal(t, b.Total(), 5)
assert.Zero(t, MetricsDropped.Get())
assert.Equal(t, int64(5), MetricsGathered.Get())

// Verify that the buffer returned is not more than the size requested
b.Add(metricList...)
Expand All @@ -89,6 +95,6 @@ func TestGettingBatches(t *testing.T) {
// Verify that buffer is not empty
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 2)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 10)
assert.Equal(t, int64(0), MetricsDropped.Get())
assert.Equal(t, int64(10), MetricsGathered.Get())
}
5 changes: 1 addition & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,10 +821,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
return err
}

rp := &models.RunningInput{
Input: input,
Config: pluginConfig,
}
rp := models.NewRunningInput(input, pluginConfig)
c.Inputs = append(c.Inputs, rp)
return nil
}
Expand Down
9 changes: 3 additions & 6 deletions internal/models/makemetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func makemetric(
daemonTags map[string]string,
filter Filter,
applyFilter bool,
debug bool,
mType telegraf.ValueType,
t time.Time,
) telegraf.Metric {
Expand Down Expand Up @@ -122,11 +121,9 @@ func makemetric(
case float64:
// NaNs are invalid values in influxdb, skip measurement
if math.IsNaN(val) || math.IsInf(val, 0) {
if debug {
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
"field, skipping",
measurement, k)
}
log.Printf("D! Measurement [%s] field [%s] has a NaN or Inf "+
"field, skipping",
measurement, k)
delete(fields, k)
continue
}
Expand Down
1 change: 0 additions & 1 deletion internal/models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func (r *RunningAggregator) MakeMetric(
nil,
r.Config.Filter,
false,
false,
mType,
t,
)
Expand Down
28 changes: 18 additions & 10 deletions internal/models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,30 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

var GlobalMetricsGathered = selfstat.Register("self_agent", "metrics_gathered", map[string]string{})

type RunningInput struct {
Input telegraf.Input
Config *InputConfig

trace bool
debug bool
defaultTags map[string]string

MetricsGathered selfstat.Stat
}

func NewRunningInput(
input telegraf.Input,
config *InputConfig,
) *RunningInput {
return &RunningInput{
Input: input,
Config: config,
MetricsGathered: selfstat.Register("self_"+config.Name, "metrics_gathered", map[string]string{}),
}
}

// InputConfig containing a name, interval, and filter
Expand Down Expand Up @@ -51,7 +66,6 @@ func (r *RunningInput) MakeMetric(
r.defaultTags,
r.Config.Filter,
true,
r.debug,
mType,
t,
)
Expand All @@ -60,17 +74,11 @@ func (r *RunningInput) MakeMetric(
fmt.Println("> " + m.String())
}

r.MetricsGathered.Incr(1)
GlobalMetricsGathered.Incr(1)
return m
}

func (r *RunningInput) Debug() bool {
return r.debug
}

func (r *RunningInput) SetDebug(debug bool) {
r.debug = debug
}

func (r *RunningInput) Trace() bool {
return r.trace
}
Expand Down
Loading

0 comments on commit 75498a9

Please sign in to comment.