From 75498a935de3c627ba05251fefa2eb349e417bba Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 7 Nov 2016 08:34:46 +0000 Subject: [PATCH] Implement telegraf collecting stats on itself closes #1348 --- accumulator.go | 4 +- agent/accumulator.go | 10 +- agent/accumulator_test.go | 2 +- agent/agent.go | 25 ++-- internal/buffer/buffer.go | 25 ++-- internal/buffer/buffer_test.go | 34 +++-- internal/config/config.go | 5 +- internal/models/makemetric.go | 9 +- internal/models/running_aggregator.go | 1 - internal/models/running_input.go | 28 ++-- internal/models/running_input_test.go | 128 ++++++++---------- internal/models/running_output.go | 32 ++--- internal/models/running_output_test.go | 3 - plugins/inputs/all/all.go | 1 + plugins/inputs/http_listener/http_listener.go | 36 +++++ plugins/inputs/self/self.go | 64 +++++++++ plugins/inputs/tcp_listener/tcp_listener.go | 22 +++ plugins/inputs/udp_listener/udp_listener.go | 12 ++ selfstat/selfstat.go | 96 +++++++++++++ selfstat/stat.go | 56 ++++++++ selfstat/timingStat.go | 71 ++++++++++ testutil/accumulator.go | 8 ++ 22 files changed, 505 insertions(+), 167 deletions(-) create mode 100644 plugins/inputs/self/self.go create mode 100644 selfstat/selfstat.go create mode 100644 selfstat/stat.go create mode 100644 selfstat/timingStat.go diff --git a/accumulator.go b/accumulator.go index 13fd6e5711852..04039d1d5be4a 100644 --- a/accumulator.go +++ b/accumulator.go @@ -1,6 +1,8 @@ package telegraf -import "time" +import ( + "time" +) // Accumulator is an interface for "accumulating" metrics from plugin(s). // The metrics are sent down a channel shared between all plugins. diff --git a/agent/accumulator.go b/agent/accumulator.go index 0d682d2857b6d..ea9d03fff3522 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -2,10 +2,14 @@ package agent import ( "log" - "sync/atomic" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" +) + +var ( + NErrors = selfstat.Register("self_agent", "gather_errors", map[string]string{}) ) type MetricMaker interface { @@ -37,8 +41,6 @@ type accumulator struct { maker MetricMaker precision time.Duration - - errCount uint64 } func (ac *accumulator) AddFields( @@ -80,7 +82,7 @@ func (ac *accumulator) AddError(err error) { if err == nil { return } - atomic.AddUint64(&ac.errCount, 1) + NErrors.Incr(1) //TODO suppress/throttle consecutive duplicate errors? log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err) } diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index ef8d9eb202b86..72f81dc320805 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -87,7 +87,7 @@ func TestAccAddError(t *testing.T) { a.AddError(fmt.Errorf("baz")) errs := bytes.Split(errBuf.Bytes(), []byte{'\n'}) - assert.EqualValues(t, 3, a.errCount) + assert.EqualValues(t, int64(3), NErrors.Get()) require.Len(t, errs, 4) // 4 because of trailing newline assert.Contains(t, string(errs[0]), "TestPlugin") assert.Contains(t, string(errs[0]), "foo") diff --git a/agent/agent.go b/agent/agent.go index 1a205e218b203..5e750698e241d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/selfstat" ) // Agent runs telegraf and collects data based on the given config @@ -44,8 +45,6 @@ func NewAgent(config *config.Config) (*Agent, error) { // Connect connects to all configured outputs func (a *Agent) Connect() error { for _, o := range a.Config.Outputs { - o.Quiet = a.Config.Agent.Quiet - switch ot := o.Output.(type) { case telegraf.ServiceOutput: if err := ot.Start(); err != nil { @@ -106,24 +105,23 @@ func (a *Agent) gatherer( ) { defer panicRecover(input) + GatherTime := selfstat.RegisterTiming("self_"+input.Config.Name, "gather_time_ns", map[string]string{}) + + acc := NewAccumulator(input, metricC) + acc.SetPrecision(a.Config.Agent.Precision.Duration, + a.Config.Agent.Interval.Duration) + ticker := time.NewTicker(interval) defer ticker.Stop() for { - acc := NewAccumulator(input, metricC) - acc.SetPrecision(a.Config.Agent.Precision.Duration, - a.Config.Agent.Interval.Duration) - input.SetDebug(a.Config.Agent.Debug) - input.SetDefaultTags(a.Config.Tags) - internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown) start := time.Now() gatherWithTimeout(shutdown, input, acc, interval) elapsed := time.Since(start) - log.Printf("D! Input [%s] gathered metrics, (%s interval) in %s\n", - input.Name(), interval, elapsed) + GatherTime.Incr(elapsed.Nanoseconds()) select { case <-shutdown: @@ -204,9 +202,6 @@ func (a *Agent) Test() error { if err := input.Input.Gather(acc); err != nil { return err } - if acc.errCount > 0 { - return fmt.Errorf("Errors encountered during processing") - } // Special instructions for some inputs. cpu, for example, needs to be // run twice in order to return cpu usage percentages. @@ -323,17 +318,17 @@ func (a *Agent) Run(shutdown chan struct{}) error { a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) // channel shared between all input threads for accumulating metrics - metricC := make(chan telegraf.Metric, 100) + metricC := make(chan telegraf.Metric, 1000) // Start all ServicePlugins for _, input := range a.Config.Inputs { + input.SetDefaultTags(a.Config.Tags) switch p := input.Input.(type) { case telegraf.ServiceInput: acc := NewAccumulator(input, metricC) // Service input plugins should set their own precision of their // metrics. acc.SetPrecision(time.Nanosecond, 0) - input.SetDefaultTags(a.Config.Tags) if err := p.Start(acc); err != nil { log.Printf("E! Service for input %s failed to start, exiting\n%s\n", input.Name(), err.Error()) diff --git a/internal/buffer/buffer.go b/internal/buffer/buffer.go index 58cd1c3764d7e..00f0b00a46819 100644 --- a/internal/buffer/buffer.go +++ b/internal/buffer/buffer.go @@ -4,15 +4,17 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" +) + +var ( + MetricsWritten = selfstat.Register("self_agent", "metrics_written", map[string]string{}) + MetricsDropped = selfstat.Register("self_agent", "metrics_dropped", map[string]string{}) ) // Buffer is an object for storing metrics in a circular buffer. type Buffer struct { buf chan telegraf.Metric - // total dropped metrics - drops int - // total metrics added - total int mu sync.Mutex } @@ -36,25 +38,14 @@ func (b *Buffer) Len() int { return len(b.buf) } -// Drops returns the total number of dropped metrics that have occured in this -// buffer since instantiation. -func (b *Buffer) Drops() int { - return b.drops -} - -// Total returns the total number of metrics that have been added to this buffer. -func (b *Buffer) Total() int { - return b.total -} - // Add adds metrics to the buffer. func (b *Buffer) Add(metrics ...telegraf.Metric) { for i, _ := range metrics { - b.total++ + MetricsWritten.Incr(1) select { case b.buf <- metrics[i]: default: - b.drops++ + MetricsDropped.Incr(1) <-b.buf b.buf <- metrics[i] } diff --git a/internal/buffer/buffer_test.go b/internal/buffer/buffer_test.go index 9a36f4d84b708..581f16f846709 100644 --- a/internal/buffer/buffer_test.go +++ b/internal/buffer/buffer_test.go @@ -27,47 +27,53 @@ func BenchmarkAddMetrics(b *testing.B) { func TestNewBufferBasicFuncs(t *testing.T) { b := NewBuffer(10) + MetricsDropped.Set(0) + MetricsGathered.Set(0) assert.True(t, b.IsEmpty()) assert.Zero(t, b.Len()) - assert.Zero(t, b.Drops()) - assert.Zero(t, b.Total()) + assert.Zero(t, MetricsDropped.Get()) + assert.Zero(t, MetricsGathered.Get()) m := testutil.TestMetric(1, "mymetric") b.Add(m) assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 1) - assert.Equal(t, b.Drops(), 0) - assert.Equal(t, b.Total(), 1) + assert.Equal(t, int64(0), MetricsDropped.Get()) + assert.Equal(t, int64(1), MetricsGathered.Get()) b.Add(metricList...) assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 6) - assert.Equal(t, b.Drops(), 0) - assert.Equal(t, b.Total(), 6) + assert.Equal(t, int64(0), MetricsDropped.Get()) + assert.Equal(t, int64(6), MetricsGathered.Get()) } func TestDroppingMetrics(t *testing.T) { b := NewBuffer(10) + MetricsDropped.Set(0) + MetricsGathered.Set(0) // Add up to the size of the buffer b.Add(metricList...) b.Add(metricList...) assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 10) - assert.Equal(t, b.Drops(), 0) - assert.Equal(t, b.Total(), 10) + assert.Equal(t, int64(0), MetricsDropped.Get()) + assert.Equal(t, int64(10), MetricsGathered.Get()) // Add 5 more and verify they were dropped b.Add(metricList...) assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 10) - assert.Equal(t, b.Drops(), 5) - assert.Equal(t, b.Total(), 15) + assert.Equal(t, int64(5), MetricsDropped.Get()) + assert.Equal(t, int64(15), MetricsGathered.Get()) } func TestGettingBatches(t *testing.T) { b := NewBuffer(20) + MetricsDropped.Set(0) + MetricsGathered.Set(0) // Verify that the buffer returned is smaller than requested when there are // not as many items as requested. @@ -78,8 +84,8 @@ func TestGettingBatches(t *testing.T) { // Verify that the buffer is now empty assert.True(t, b.IsEmpty()) assert.Zero(t, b.Len()) - assert.Zero(t, b.Drops()) - assert.Equal(t, b.Total(), 5) + assert.Zero(t, MetricsDropped.Get()) + assert.Equal(t, int64(5), MetricsGathered.Get()) // Verify that the buffer returned is not more than the size requested b.Add(metricList...) @@ -89,6 +95,6 @@ func TestGettingBatches(t *testing.T) { // Verify that buffer is not empty assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 2) - assert.Equal(t, b.Drops(), 0) - assert.Equal(t, b.Total(), 10) + assert.Equal(t, int64(0), MetricsDropped.Get()) + assert.Equal(t, int64(10), MetricsGathered.Get()) } diff --git a/internal/config/config.go b/internal/config/config.go index 2c2199dacb00e..24dec4169bfc9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -821,10 +821,7 @@ func (c *Config) addInput(name string, table *ast.Table) error { return err } - rp := &models.RunningInput{ - Input: input, - Config: pluginConfig, - } + rp := models.NewRunningInput(input, pluginConfig) c.Inputs = append(c.Inputs, rp) return nil } diff --git a/internal/models/makemetric.go b/internal/models/makemetric.go index 71427607c187c..eb7fa2254310a 100644 --- a/internal/models/makemetric.go +++ b/internal/models/makemetric.go @@ -31,7 +31,6 @@ func makemetric( daemonTags map[string]string, filter Filter, applyFilter bool, - debug bool, mType telegraf.ValueType, t time.Time, ) telegraf.Metric { @@ -122,11 +121,9 @@ func makemetric( case float64: // NaNs are invalid values in influxdb, skip measurement if math.IsNaN(val) || math.IsInf(val, 0) { - if debug { - log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+ - "field, skipping", - measurement, k) - } + log.Printf("D! Measurement [%s] field [%s] has a NaN or Inf "+ + "field, skipping", + measurement, k) delete(fields, k) continue } diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 5c7640ba68258..f2d67308384ae 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -65,7 +65,6 @@ func (r *RunningAggregator) MakeMetric( nil, r.Config.Filter, false, - false, mType, t, ) diff --git a/internal/models/running_input.go b/internal/models/running_input.go index 558af3e5c0d12..540341b9efc38 100644 --- a/internal/models/running_input.go +++ b/internal/models/running_input.go @@ -5,15 +5,30 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" ) +var GlobalMetricsGathered = selfstat.Register("self_agent", "metrics_gathered", map[string]string{}) + type RunningInput struct { Input telegraf.Input Config *InputConfig trace bool - debug bool defaultTags map[string]string + + MetricsGathered selfstat.Stat +} + +func NewRunningInput( + input telegraf.Input, + config *InputConfig, +) *RunningInput { + return &RunningInput{ + Input: input, + Config: config, + MetricsGathered: selfstat.Register("self_"+config.Name, "metrics_gathered", map[string]string{}), + } } // InputConfig containing a name, interval, and filter @@ -51,7 +66,6 @@ func (r *RunningInput) MakeMetric( r.defaultTags, r.Config.Filter, true, - r.debug, mType, t, ) @@ -60,17 +74,11 @@ func (r *RunningInput) MakeMetric( fmt.Println("> " + m.String()) } + r.MetricsGathered.Incr(1) + GlobalMetricsGathered.Incr(1) return m } -func (r *RunningInput) Debug() bool { - return r.debug -} - -func (r *RunningInput) SetDebug(debug bool) { - r.debug = debug -} - func (r *RunningInput) Trace() bool { return r.trace } diff --git a/internal/models/running_input_test.go b/internal/models/running_input_test.go index 3d3b65b953151..690699c837ae1 100644 --- a/internal/models/running_input_test.go +++ b/internal/models/running_input_test.go @@ -13,11 +13,9 @@ import ( func TestMakeMetricNoFields(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) m := ri.MakeMetric( "RITest", @@ -32,11 +30,9 @@ func TestMakeMetricNoFields(t *testing.T) { // nil fields should get dropped func TestMakeMetricNilFields(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) m := ri.MakeMetric( "RITest", @@ -58,13 +54,10 @@ func TestMakeMetricNilFields(t *testing.T) { // make an untyped, counter, & gauge metric func TestMakeMetric(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) assert.Equal(t, "inputs.TestRunningInput", ri.Name()) @@ -126,16 +119,13 @@ func TestMakeMetric(t *testing.T) { func TestMakeMetricWithPluginTags(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - Tags: map[string]string{ - "foo": "bar", - }, + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + Tags: map[string]string{ + "foo": "bar", }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) @@ -155,17 +145,14 @@ func TestMakeMetricWithPluginTags(t *testing.T) { func TestMakeMetricFilteredOut(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - Tags: map[string]string{ - "foo": "bar", - }, - Filter: Filter{NamePass: []string{"foobar"}}, + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + Tags: map[string]string{ + "foo": "bar", }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + Filter: Filter{NamePass: []string{"foobar"}}, + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) assert.NoError(t, ri.Config.Filter.Compile()) @@ -182,16 +169,13 @@ func TestMakeMetricFilteredOut(t *testing.T) { func TestMakeMetricWithDaemonTags(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) ri.SetDefaultTags(map[string]string{ "foo": "bar", }) - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) @@ -214,13 +198,10 @@ func TestMakeMetricInfFields(t *testing.T) { inf := math.Inf(1) ninf := math.Inf(-1) now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) @@ -244,13 +225,10 @@ func TestMakeMetricInfFields(t *testing.T) { func TestMakeMetricAllFieldTypes(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) @@ -284,12 +262,10 @@ func TestMakeMetricAllFieldTypes(t *testing.T) { func TestMakeMetricNameOverride(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - NameOverride: "foobar", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + NameOverride: "foobar", + }) m := ri.MakeMetric( "RITest", @@ -307,12 +283,10 @@ func TestMakeMetricNameOverride(t *testing.T) { func TestMakeMetricNamePrefix(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - MeasurementPrefix: "foobar_", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + MeasurementPrefix: "foobar_", + }) m := ri.MakeMetric( "RITest", @@ -330,12 +304,10 @@ func TestMakeMetricNamePrefix(t *testing.T) { func TestMakeMetricNameSuffix(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - MeasurementSuffix: "_foobar", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + MeasurementSuffix: "_foobar", + }) m := ri.MakeMetric( "RITest", @@ -350,3 +322,9 @@ func TestMakeMetricNameSuffix(t *testing.T) { fmt.Sprintf("RITest_foobar value=101i %d", now.UnixNano()), ) } + +type testInput struct{} + +func (t *testInput) Description() string { return "" } +func (t *testInput) SampleConfig() string { return "" } +func (t *testInput) Gather(acc telegraf.Accumulator) error { return nil } diff --git a/internal/models/running_output.go b/internal/models/running_output.go index aa94178f74145..da3a3d04d02bc 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -1,11 +1,11 @@ package models import ( - "log" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/buffer" + "github.com/influxdata/telegraf/selfstat" ) const ( @@ -21,10 +21,14 @@ type RunningOutput struct { Name string Output telegraf.Output Config *OutputConfig - Quiet bool MetricBufferLimit int MetricBatchSize int + MetricsWritten selfstat.Stat + BufferSize selfstat.Stat + BufferLimit selfstat.Stat + WriteTime selfstat.Stat + metrics *buffer.Buffer failMetrics *buffer.Buffer } @@ -50,7 +54,12 @@ func NewRunningOutput( Config: conf, MetricBufferLimit: bufferLimit, MetricBatchSize: batchSize, + MetricsWritten: selfstat.Register("self_"+name, "metrics_written", map[string]string{}), + BufferSize: selfstat.Register("self_"+name, "buffer_size", map[string]string{}), + BufferLimit: selfstat.Register("self_"+name, "buffer_limit", map[string]string{}), + WriteTime: selfstat.RegisterTiming("self_"+name, "write_time_ns", map[string]string{}), } + ro.BufferLimit.Incr(int64(ro.MetricBufferLimit)) return ro } @@ -84,16 +93,7 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { // Write writes all cached points to this output. func (ro *RunningOutput) Write() error { - if !ro.Quiet { - log.Printf("I! Output [%s] buffer fullness: %d / %d metrics. "+ - "Total gathered metrics: %d. Total dropped metrics: %d.", - ro.Name, - ro.failMetrics.Len()+ro.metrics.Len(), - ro.MetricBufferLimit, - ro.metrics.Total(), - ro.metrics.Drops()+ro.failMetrics.Drops()) - } - + ro.BufferSize.Incr(int64(ro.metrics.Len())) var err error if !ro.failMetrics.IsEmpty() { bufLen := ro.failMetrics.Len() @@ -126,6 +126,7 @@ func (ro *RunningOutput) Write() error { if err == nil { err = ro.write(batch) } + if err != nil { ro.failMetrics.Add(batch...) return err @@ -141,10 +142,9 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error { err := ro.Output.Write(metrics) elapsed := time.Since(start) if err == nil { - if !ro.Quiet { - log.Printf("I! Output [%s] wrote batch of %d metrics in %s\n", - ro.Name, len(metrics), elapsed) - } + ro.MetricsWritten.Incr(int64(len(metrics))) + ro.BufferSize.Incr(-int64(len(metrics))) + ro.WriteTime.Incr(elapsed.Nanoseconds()) } return err } diff --git a/internal/models/running_output_test.go b/internal/models/running_output_test.go index 2bca79a067b01..9bc1067957b0b 100644 --- a/internal/models/running_output_test.go +++ b/internal/models/running_output_test.go @@ -36,7 +36,6 @@ func BenchmarkRunningOutputAddWrite(b *testing.B) { m := &perfOutput{} ro := NewRunningOutput("test", m, conf, 1000, 10000) - ro.Quiet = true for n := 0; n < b.N; n++ { ro.AddMetric(first5[0]) @@ -52,7 +51,6 @@ func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) { m := &perfOutput{} ro := NewRunningOutput("test", m, conf, 1000, 10000) - ro.Quiet = true for n := 0; n < b.N; n++ { ro.AddMetric(first5[0]) @@ -71,7 +69,6 @@ func BenchmarkRunningOutputAddFailWrites(b *testing.B) { m := &perfOutput{} m.failWrite = true ro := NewRunningOutput("test", m, conf, 1000, 10000) - ro.Quiet = true for n := 0; n < b.N; n++ { ro.AddMetric(first5[0]) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 67b85905e8dc5..c7438b9a18ac6 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -62,6 +62,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/riak" + _ "github.com/influxdata/telegraf/plugins/inputs/self" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" _ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy" diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index ddc9ac7bf8cec..b98faddfaab26 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/selfstat" ) const ( @@ -43,6 +44,17 @@ type HTTPListener struct { parser influx.InfluxParser acc telegraf.Accumulator pool *pool + + BytesRecv selfstat.Stat + RequestsServed selfstat.Stat + WritesServed selfstat.Stat + QueriesServed selfstat.Stat + PingsServed selfstat.Stat + RequestsRecv selfstat.Stat + WritesRecv selfstat.Stat + QueriesRecv selfstat.Stat + PingsRecv selfstat.Stat + NotFoundsServed selfstat.Stat } const sampleConfig = ` @@ -81,6 +93,20 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error { h.mu.Lock() defer h.mu.Unlock() + tags := map[string]string{ + "address": h.ServiceAddress, + } + h.BytesRecv = selfstat.Register("self_http_listener", "bytes_received", tags) + h.RequestsServed = selfstat.Register("self_http_listener", "requests_served", tags) + h.WritesServed = selfstat.Register("self_http_listener", "writes_served", tags) + h.QueriesServed = selfstat.Register("self_http_listener", "queries_served", tags) + h.PingsServed = selfstat.Register("self_http_listener", "pings_served", tags) + h.RequestsRecv = selfstat.Register("self_http_listener", "requests_received", tags) + h.WritesRecv = selfstat.Register("self_http_listener", "writes_received", tags) + h.QueriesRecv = selfstat.Register("self_http_listener", "queries_received", tags) + h.PingsRecv = selfstat.Register("self_http_listener", "pings_received", tags) + h.NotFoundsServed = selfstat.Register("self_http_listener", "not_founds_served", tags) + if h.MaxBodySize == 0 { h.MaxBodySize = DEFAULT_MAX_BODY_SIZE } @@ -141,10 +167,16 @@ func (h *HTTPListener) httpListen() error { } func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { + h.RequestsRecv.Incr(1) + defer h.RequestsServed.Incr(1) switch req.URL.Path { case "/write": + h.WritesRecv.Incr(1) + defer h.WritesServed.Incr(1) h.serveWrite(res, req) case "/query": + h.QueriesRecv.Incr(1) + defer h.QueriesServed.Incr(1) // Deliver a dummy response to the query endpoint, as some InfluxDB // clients test endpoint availability with a query res.Header().Set("Content-Type", "application/json") @@ -152,9 +184,12 @@ func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { res.WriteHeader(http.StatusOK) res.Write([]byte("{\"results\":[]}")) case "/ping": + h.PingsRecv.Incr(1) + defer h.PingsServed.Incr(1) // respond to ping requests res.WriteHeader(http.StatusNoContent) default: + defer h.NotFoundsServed.Incr(1) // Don't know how to respond to calls to other endpoints http.NotFound(res, req) } @@ -195,6 +230,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { badRequest(res) return } + h.BytesRecv.Incr(int64(n)) if err == io.EOF { if return400 { diff --git a/plugins/inputs/self/self.go b/plugins/inputs/self/self.go new file mode 100644 index 0000000000000..784f6a4f341fe --- /dev/null +++ b/plugins/inputs/self/self.go @@ -0,0 +1,64 @@ +package self + +import ( + "runtime" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/selfstat" +) + +type Self struct { + CollectMemstats bool +} + +var sampleConfig = ` + ## If true, collect telegraf memory stats. + # collect_memstats = true +` + +func (s *Self) Description() string { + return "Collect statistics about itself" +} + +func (s *Self) SampleConfig() string { + return sampleConfig +} + +func (s *Self) Gather(acc telegraf.Accumulator) error { + if s.CollectMemstats { + m := &runtime.MemStats{} + runtime.ReadMemStats(m) + fields := map[string]interface{}{ + "alloc_bytes": m.Alloc, // bytes allocated and not yet freed + "total_alloc_bytes": m.TotalAlloc, // bytes allocated (even if freed) + "sys_bytes": m.Sys, // bytes obtained from system (sum of XxxSys below) + "pointer_lookups": m.Lookups, // number of pointer lookups + "mallocs": m.Mallocs, // number of mallocs + "frees": m.Frees, // number of frees + // Main allocation heap statistics. + "heap_alloc_bytes": m.HeapAlloc, // bytes allocated and not yet freed (same as Alloc above) + "heap_sys_bytes": m.HeapSys, // bytes obtained from system + "heap_idle_bytes": m.HeapIdle, // bytes in idle spans + "heap_in_use_bytes": m.HeapInuse, // bytes in non-idle span + "heap_released_bytes": m.HeapReleased, // bytes released to the OS + "heap_objects_bytes": m.HeapObjects, // total number of allocated objects + "num_gc": m.NumGC, + } + acc.AddFields("self_memstats", fields, map[string]string{}) + } + + for _, m := range selfstat.Metrics() { + acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + + return nil +} + +func init() { + inputs.Add("self", func() telegraf.Input { + return &Self{ + CollectMemstats: true, + } + }) +} diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go index 861442348ae13..06df185a1bc1f 100644 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/selfstat" ) type TcpListener struct { @@ -41,6 +42,12 @@ type TcpListener struct { parser parsers.Parser acc telegraf.Accumulator + + MaxConnections selfstat.Stat + CurrentConnections selfstat.Stat + TotalConnections selfstat.Stat + PacketsRecv selfstat.Stat + BytesRecv selfstat.Stat } var dropwarn = "E! Error: tcp_listener message queue full. " + @@ -91,6 +98,16 @@ func (t *TcpListener) Start(acc telegraf.Accumulator) error { t.Lock() defer t.Unlock() + tags := map[string]string{ + "address": t.ServiceAddress, + } + t.MaxConnections = selfstat.Register("self_tcp_listener", "max_connections", tags) + t.MaxConnections.Set(int64(t.MaxTCPConnections)) + t.CurrentConnections = selfstat.Register("self_tcp_listener", "current_connections", tags) + t.TotalConnections = selfstat.Register("self_tcp_listener", "total_connections", tags) + t.PacketsRecv = selfstat.Register("self_tcp_listener", "packets_received", tags) + t.BytesRecv = selfstat.Register("self_tcp_listener", "bytes_received", tags) + t.acc = acc t.in = make(chan []byte, t.AllowedPendingMessages) t.done = make(chan struct{}) @@ -189,6 +206,8 @@ func (t *TcpListener) refuser(conn *net.TCPConn) { // handler handles a single TCP Connection func (t *TcpListener) handler(conn *net.TCPConn, id string) { + t.CurrentConnections.Incr(1) + t.TotalConnections.Incr(1) // connection cleanup function defer func() { t.wg.Done() @@ -196,6 +215,7 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { // Add one connection potential back to channel when this one closes t.accept <- true t.forget(id) + t.CurrentConnections.Incr(-1) }() var n int @@ -212,6 +232,8 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { if n == 0 { continue } + t.BytesRecv.Incr(int64(n)) + t.PacketsRecv.Incr(1) bufCopy := make([]byte, n) copy(bufCopy, scanner.Bytes()) diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index f8dff5269360b..53ed253688cbc 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/selfstat" ) type UdpListener struct { @@ -36,6 +37,9 @@ type UdpListener struct { acc telegraf.Accumulator listener *net.UDPConn + + PacketsRecv selfstat.Stat + BytesRecv selfstat.Stat } // UDP packet limit, see @@ -86,6 +90,12 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error { u.Lock() defer u.Unlock() + tags := map[string]string{ + "address": u.ServiceAddress, + } + u.PacketsRecv = selfstat.Register("self_udp_listener", "packets_received", tags) + u.BytesRecv = selfstat.Register("self_udp_listener", "bytes_received", tags) + u.acc = acc u.in = make(chan []byte, u.AllowedPendingMessages) u.done = make(chan struct{}) @@ -133,6 +143,8 @@ func (u *UdpListener) udpListen() error { } continue } + u.BytesRecv.Incr(int64(n)) + u.PacketsRecv.Incr(1) bufCopy := make([]byte, n) copy(bufCopy, buf[:n]) diff --git a/selfstat/selfstat.go b/selfstat/selfstat.go new file mode 100644 index 0000000000000..050a3f0c293bc --- /dev/null +++ b/selfstat/selfstat.go @@ -0,0 +1,96 @@ +package selfstat + +import ( + "log" + "sync" + "time" + + "github.com/influxdata/telegraf" +) + +var ( + registry *rgstry +) + +type Stat interface { + Name() string + FieldName() string + Tags() map[string]string + Key() uint64 + Incr(v int64) + Set(v int64) + Get() int64 +} + +func Register(measurement, field string, tags map[string]string) Stat { + return registry.register(&stat{ + measurement: measurement, + field: field, + metadata: tags, + }) +} + +func RegisterTiming(measurement, field string, tags map[string]string) Stat { + return registry.register(&timingStat{ + measurement: measurement, + field: field, + metadata: tags, + }) +} + +func Metrics() []telegraf.Metric { + registry.mu.Lock() + now := time.Now() + metrics := make([]telegraf.Metric, len(registry.stats)) + i := 0 + for _, stats := range registry.stats { + if len(stats) > 0 { + var tags map[string]string + var name string + fields := map[string]interface{}{} + for fieldname, stat := range stats { + fields[fieldname] = stat.Get() + tags = stat.Tags() + name = stat.Name() + } + metric, err := telegraf.NewMetric(name, tags, fields, now) + if err != nil { + log.Printf("E! Error creating selfstat metric: %s", err) + continue + } + metrics[i] = metric + i++ + } + } + registry.mu.Unlock() + return metrics +} + +type rgstry struct { + stats map[uint64]map[string]Stat + mu sync.Mutex +} + +func (r *rgstry) register(s Stat) Stat { + r.mu.Lock() + defer r.mu.Unlock() + if stats, ok := r.stats[s.Key()]; ok { + // measurement exists + if stat, ok := stats[s.FieldName()]; ok { + // field already exists, so don't create a new one + return stat + } + r.stats[s.Key()][s.FieldName()] = s + return s + } else { + // creating a new unique metric + r.stats[s.Key()] = map[string]Stat{s.FieldName(): s} + return s + } +} + +func init() { + registry = &rgstry{ + stats: make(map[uint64]map[string]Stat), + } +} diff --git a/selfstat/stat.go b/selfstat/stat.go new file mode 100644 index 0000000000000..4a2b4e95465b0 --- /dev/null +++ b/selfstat/stat.go @@ -0,0 +1,56 @@ +package selfstat + +import ( + "hash/fnv" + "sync/atomic" +) + +type stat struct { + measurement string + field string + metadata map[string]string + key uint64 + v int64 +} + +func (s *stat) Incr(v int64) { + atomic.AddInt64(&s.v, v) +} + +func (s *stat) Set(v int64) { + atomic.StoreInt64(&s.v, v) +} + +func (s *stat) Get() int64 { + return atomic.LoadInt64(&s.v) +} + +func (s *stat) Name() string { + return s.measurement +} + +func (s *stat) FieldName() string { + return s.field +} + +// Metadata returns a copy of the stat's metadata. +// NOTE this allocates a new map every time it is called. +func (s *stat) Tags() map[string]string { + m := make(map[string]string, len(s.metadata)) + for k, v := range s.metadata { + m[k] = v + } + return m +} + +func (s *stat) Key() uint64 { + if s.key == 0 { + h := fnv.New64a() + h.Write([]byte(s.measurement)) + for k, v := range s.metadata { + h.Write([]byte(k + v)) + } + s.key = h.Sum64() + } + return s.key +} diff --git a/selfstat/timingStat.go b/selfstat/timingStat.go new file mode 100644 index 0000000000000..6ea1e7d69601c --- /dev/null +++ b/selfstat/timingStat.go @@ -0,0 +1,71 @@ +package selfstat + +import ( + "hash/fnv" + "sync" +) + +type timingStat struct { + measurement string + field string + metadata map[string]string + key uint64 + v int64 + prev int64 + count int64 + mu sync.Mutex +} + +func (s *timingStat) Incr(v int64) { + s.mu.Lock() + s.v += v + s.count++ + s.mu.Unlock() +} + +func (s *timingStat) Set(v int64) { +} + +func (s *timingStat) Get() int64 { + var avg int64 + s.mu.Lock() + if s.count > 0 { + s.prev, avg = s.v/s.count, s.v/s.count + s.v = 0 + s.count = 0 + } else { + avg = s.prev + } + s.mu.Unlock() + return avg +} + +func (s *timingStat) Name() string { + return s.measurement +} + +func (s *timingStat) FieldName() string { + return s.field +} + +// Metadata returns a copy of the timingStat's metadata. +// NOTE this allocates a new map every time it is called. +func (s *timingStat) Tags() map[string]string { + m := make(map[string]string, len(s.metadata)) + for k, v := range s.metadata { + m[k] = v + } + return m +} + +func (s *timingStat) Key() uint64 { + if s.key == 0 { + h := fnv.New64a() + h.Write([]byte(s.measurement)) + for k, v := range s.metadata { + h.Write([]byte(k + v)) + } + s.key = h.Sum64() + } + return s.key +} diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 99f9e300683b5..1ca02fd4022d9 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" + "github.com/stretchr/testify/assert" ) @@ -110,6 +112,12 @@ func (a *Accumulator) AddGauge( a.AddFields(measurement, fields, tags, timestamp...) } +func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) { + for _, m := range metrics { + a.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } +} + // AddError appends the given error to Accumulator.Errors. func (a *Accumulator) AddError(err error) { if err == nil {