From afa50a7e6a3cece5a2b62c1fb069ee2a90a8157b Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 7 Nov 2016 08:34:46 +0000 Subject: [PATCH] Implement telegraf collecting stats on itself closes #1348 --- agent/accumulator.go | 10 +- agent/accumulator_test.go | 2 +- agent/agent.go | 25 +- internal/buffer/buffer.go | 25 +- internal/buffer/buffer_test.go | 34 +-- internal/config/config.go | 5 +- internal/models/makemetric.go | 9 +- internal/models/running_aggregator.go | 1 - internal/models/running_input.go | 28 ++- internal/models/running_input_test.go | 128 +++++----- internal/models/running_output.go | 44 ++-- internal/models/running_output_test.go | 3 - plugins/inputs/all/all.go | 1 + plugins/inputs/http_listener/http_listener.go | 36 +++ plugins/inputs/self/README.md | 81 +++++++ plugins/inputs/self/self.go | 66 ++++++ plugins/inputs/self/self_test.go | 62 +++++ plugins/inputs/tcp_listener/tcp_listener.go | 22 ++ plugins/inputs/udp_listener/udp_listener.go | 12 + selfstat/README.md | 1 + selfstat/selfstat.go | 132 +++++++++++ selfstat/selfstat_test.go | 222 ++++++++++++++++++ selfstat/stat.go | 69 ++++++ selfstat/timingStat.go | 85 +++++++ testutil/accumulator.go | 8 + 25 files changed, 942 insertions(+), 169 deletions(-) create mode 100644 plugins/inputs/self/README.md create mode 100644 plugins/inputs/self/self.go create mode 100644 plugins/inputs/self/self_test.go create mode 100644 selfstat/README.md create mode 100644 selfstat/selfstat.go create mode 100644 selfstat/selfstat_test.go create mode 100644 selfstat/stat.go create mode 100644 selfstat/timingStat.go diff --git a/agent/accumulator.go b/agent/accumulator.go index 0d682d2857b6d..1f9e2270d63ce 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -2,10 +2,14 @@ package agent import ( "log" - "sync/atomic" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" +) + +var ( + NErrors = selfstat.Register("agent", "gather_errors", map[string]string{}) ) type MetricMaker interface { @@ -37,8 +41,6 @@ type accumulator struct { maker MetricMaker precision time.Duration - - errCount uint64 } func (ac *accumulator) AddFields( @@ -80,7 +82,7 @@ func (ac *accumulator) AddError(err error) { if err == nil { return } - atomic.AddUint64(&ac.errCount, 1) + NErrors.Incr(1) //TODO suppress/throttle consecutive duplicate errors? log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err) } diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index ef8d9eb202b86..72f81dc320805 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -87,7 +87,7 @@ func TestAccAddError(t *testing.T) { a.AddError(fmt.Errorf("baz")) errs := bytes.Split(errBuf.Bytes(), []byte{'\n'}) - assert.EqualValues(t, 3, a.errCount) + assert.EqualValues(t, int64(3), NErrors.Get()) require.Len(t, errs, 4) // 4 because of trailing newline assert.Contains(t, string(errs[0]), "TestPlugin") assert.Contains(t, string(errs[0]), "foo") diff --git a/agent/agent.go b/agent/agent.go index 1a205e218b203..ea9f846f70f47 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/selfstat" ) // Agent runs telegraf and collects data based on the given config @@ -44,8 +45,6 @@ func NewAgent(config *config.Config) (*Agent, error) { // Connect connects to all configured outputs func (a *Agent) Connect() error { for _, o := range a.Config.Outputs { - o.Quiet = a.Config.Agent.Quiet - switch ot := o.Output.(type) { case telegraf.ServiceOutput: if err := ot.Start(); err != nil { @@ -106,24 +105,23 @@ func (a *Agent) gatherer( ) { defer panicRecover(input) + GatherTime := selfstat.RegisterTiming("inputs_"+input.Config.Name, "gather_time_ns", map[string]string{}) + + acc := NewAccumulator(input, metricC) + acc.SetPrecision(a.Config.Agent.Precision.Duration, + a.Config.Agent.Interval.Duration) + ticker := time.NewTicker(interval) defer ticker.Stop() for { - acc := NewAccumulator(input, metricC) - acc.SetPrecision(a.Config.Agent.Precision.Duration, - a.Config.Agent.Interval.Duration) - input.SetDebug(a.Config.Agent.Debug) - input.SetDefaultTags(a.Config.Tags) - internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown) start := time.Now() gatherWithTimeout(shutdown, input, acc, interval) elapsed := time.Since(start) - log.Printf("D! Input [%s] gathered metrics, (%s interval) in %s\n", - input.Name(), interval, elapsed) + GatherTime.Incr(elapsed.Nanoseconds()) select { case <-shutdown: @@ -204,9 +202,6 @@ func (a *Agent) Test() error { if err := input.Input.Gather(acc); err != nil { return err } - if acc.errCount > 0 { - return fmt.Errorf("Errors encountered during processing") - } // Special instructions for some inputs. cpu, for example, needs to be // run twice in order to return cpu usage percentages. @@ -323,17 +318,17 @@ func (a *Agent) Run(shutdown chan struct{}) error { a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) // channel shared between all input threads for accumulating metrics - metricC := make(chan telegraf.Metric, 100) + metricC := make(chan telegraf.Metric, 1000) // Start all ServicePlugins for _, input := range a.Config.Inputs { + input.SetDefaultTags(a.Config.Tags) switch p := input.Input.(type) { case telegraf.ServiceInput: acc := NewAccumulator(input, metricC) // Service input plugins should set their own precision of their // metrics. acc.SetPrecision(time.Nanosecond, 0) - input.SetDefaultTags(a.Config.Tags) if err := p.Start(acc); err != nil { log.Printf("E! Service for input %s failed to start, exiting\n%s\n", input.Name(), err.Error()) diff --git a/internal/buffer/buffer.go b/internal/buffer/buffer.go index 58cd1c3764d7e..5e7818ef12740 100644 --- a/internal/buffer/buffer.go +++ b/internal/buffer/buffer.go @@ -4,15 +4,17 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" +) + +var ( + MetricsWritten = selfstat.Register("agent", "metrics_written", map[string]string{}) + MetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{}) ) // Buffer is an object for storing metrics in a circular buffer. type Buffer struct { buf chan telegraf.Metric - // total dropped metrics - drops int - // total metrics added - total int mu sync.Mutex } @@ -36,25 +38,14 @@ func (b *Buffer) Len() int { return len(b.buf) } -// Drops returns the total number of dropped metrics that have occured in this -// buffer since instantiation. -func (b *Buffer) Drops() int { - return b.drops -} - -// Total returns the total number of metrics that have been added to this buffer. -func (b *Buffer) Total() int { - return b.total -} - // Add adds metrics to the buffer. func (b *Buffer) Add(metrics ...telegraf.Metric) { for i, _ := range metrics { - b.total++ + MetricsWritten.Incr(1) select { case b.buf <- metrics[i]: default: - b.drops++ + MetricsDropped.Incr(1) <-b.buf b.buf <- metrics[i] } diff --git a/internal/buffer/buffer_test.go b/internal/buffer/buffer_test.go index 9a36f4d84b708..f84d8c66d0082 100644 --- a/internal/buffer/buffer_test.go +++ b/internal/buffer/buffer_test.go @@ -27,47 +27,53 @@ func BenchmarkAddMetrics(b *testing.B) { func TestNewBufferBasicFuncs(t *testing.T) { b := NewBuffer(10) + MetricsDropped.Set(0) + MetricsWritten.Set(0) assert.True(t, b.IsEmpty()) assert.Zero(t, b.Len()) - assert.Zero(t, b.Drops()) - assert.Zero(t, b.Total()) + assert.Zero(t, MetricsDropped.Get()) + assert.Zero(t, MetricsWritten.Get()) m := testutil.TestMetric(1, "mymetric") b.Add(m) assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 1) - assert.Equal(t, b.Drops(), 0) - assert.Equal(t, b.Total(), 1) + assert.Equal(t, int64(0), MetricsDropped.Get()) + assert.Equal(t, int64(1), MetricsWritten.Get()) b.Add(metricList...) assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 6) - assert.Equal(t, b.Drops(), 0) - assert.Equal(t, b.Total(), 6) + assert.Equal(t, int64(0), MetricsDropped.Get()) + assert.Equal(t, int64(6), MetricsWritten.Get()) } func TestDroppingMetrics(t *testing.T) { b := NewBuffer(10) + MetricsDropped.Set(0) + MetricsWritten.Set(0) // Add up to the size of the buffer b.Add(metricList...) b.Add(metricList...) assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 10) - assert.Equal(t, b.Drops(), 0) - assert.Equal(t, b.Total(), 10) + assert.Equal(t, int64(0), MetricsDropped.Get()) + assert.Equal(t, int64(10), MetricsWritten.Get()) // Add 5 more and verify they were dropped b.Add(metricList...) assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 10) - assert.Equal(t, b.Drops(), 5) - assert.Equal(t, b.Total(), 15) + assert.Equal(t, int64(5), MetricsDropped.Get()) + assert.Equal(t, int64(15), MetricsWritten.Get()) } func TestGettingBatches(t *testing.T) { b := NewBuffer(20) + MetricsDropped.Set(0) + MetricsWritten.Set(0) // Verify that the buffer returned is smaller than requested when there are // not as many items as requested. @@ -78,8 +84,8 @@ func TestGettingBatches(t *testing.T) { // Verify that the buffer is now empty assert.True(t, b.IsEmpty()) assert.Zero(t, b.Len()) - assert.Zero(t, b.Drops()) - assert.Equal(t, b.Total(), 5) + assert.Zero(t, MetricsDropped.Get()) + assert.Equal(t, int64(5), MetricsWritten.Get()) // Verify that the buffer returned is not more than the size requested b.Add(metricList...) @@ -89,6 +95,6 @@ func TestGettingBatches(t *testing.T) { // Verify that buffer is not empty assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 2) - assert.Equal(t, b.Drops(), 0) - assert.Equal(t, b.Total(), 10) + assert.Equal(t, int64(0), MetricsDropped.Get()) + assert.Equal(t, int64(10), MetricsWritten.Get()) } diff --git a/internal/config/config.go b/internal/config/config.go index 2c2199dacb00e..24dec4169bfc9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -821,10 +821,7 @@ func (c *Config) addInput(name string, table *ast.Table) error { return err } - rp := &models.RunningInput{ - Input: input, - Config: pluginConfig, - } + rp := models.NewRunningInput(input, pluginConfig) c.Inputs = append(c.Inputs, rp) return nil } diff --git a/internal/models/makemetric.go b/internal/models/makemetric.go index 71427607c187c..eb7fa2254310a 100644 --- a/internal/models/makemetric.go +++ b/internal/models/makemetric.go @@ -31,7 +31,6 @@ func makemetric( daemonTags map[string]string, filter Filter, applyFilter bool, - debug bool, mType telegraf.ValueType, t time.Time, ) telegraf.Metric { @@ -122,11 +121,9 @@ func makemetric( case float64: // NaNs are invalid values in influxdb, skip measurement if math.IsNaN(val) || math.IsInf(val, 0) { - if debug { - log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+ - "field, skipping", - measurement, k) - } + log.Printf("D! Measurement [%s] field [%s] has a NaN or Inf "+ + "field, skipping", + measurement, k) delete(fields, k) continue } diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 5c7640ba68258..f2d67308384ae 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -65,7 +65,6 @@ func (r *RunningAggregator) MakeMetric( nil, r.Config.Filter, false, - false, mType, t, ) diff --git a/internal/models/running_input.go b/internal/models/running_input.go index 558af3e5c0d12..c09ced6875ab2 100644 --- a/internal/models/running_input.go +++ b/internal/models/running_input.go @@ -5,15 +5,30 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" ) +var GlobalMetricsGathered = selfstat.Register("agent", "metrics_gathered", map[string]string{}) + type RunningInput struct { Input telegraf.Input Config *InputConfig trace bool - debug bool defaultTags map[string]string + + MetricsGathered selfstat.Stat +} + +func NewRunningInput( + input telegraf.Input, + config *InputConfig, +) *RunningInput { + return &RunningInput{ + Input: input, + Config: config, + MetricsGathered: selfstat.Register("inputs_"+config.Name, "metrics_gathered", map[string]string{}), + } } // InputConfig containing a name, interval, and filter @@ -51,7 +66,6 @@ func (r *RunningInput) MakeMetric( r.defaultTags, r.Config.Filter, true, - r.debug, mType, t, ) @@ -60,17 +74,11 @@ func (r *RunningInput) MakeMetric( fmt.Println("> " + m.String()) } + r.MetricsGathered.Incr(1) + GlobalMetricsGathered.Incr(1) return m } -func (r *RunningInput) Debug() bool { - return r.debug -} - -func (r *RunningInput) SetDebug(debug bool) { - r.debug = debug -} - func (r *RunningInput) Trace() bool { return r.trace } diff --git a/internal/models/running_input_test.go b/internal/models/running_input_test.go index 3d3b65b953151..690699c837ae1 100644 --- a/internal/models/running_input_test.go +++ b/internal/models/running_input_test.go @@ -13,11 +13,9 @@ import ( func TestMakeMetricNoFields(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) m := ri.MakeMetric( "RITest", @@ -32,11 +30,9 @@ func TestMakeMetricNoFields(t *testing.T) { // nil fields should get dropped func TestMakeMetricNilFields(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) m := ri.MakeMetric( "RITest", @@ -58,13 +54,10 @@ func TestMakeMetricNilFields(t *testing.T) { // make an untyped, counter, & gauge metric func TestMakeMetric(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) assert.Equal(t, "inputs.TestRunningInput", ri.Name()) @@ -126,16 +119,13 @@ func TestMakeMetric(t *testing.T) { func TestMakeMetricWithPluginTags(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - Tags: map[string]string{ - "foo": "bar", - }, + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + Tags: map[string]string{ + "foo": "bar", }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) @@ -155,17 +145,14 @@ func TestMakeMetricWithPluginTags(t *testing.T) { func TestMakeMetricFilteredOut(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - Tags: map[string]string{ - "foo": "bar", - }, - Filter: Filter{NamePass: []string{"foobar"}}, + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + Tags: map[string]string{ + "foo": "bar", }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + Filter: Filter{NamePass: []string{"foobar"}}, + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) assert.NoError(t, ri.Config.Filter.Compile()) @@ -182,16 +169,13 @@ func TestMakeMetricFilteredOut(t *testing.T) { func TestMakeMetricWithDaemonTags(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) ri.SetDefaultTags(map[string]string{ "foo": "bar", }) - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) @@ -214,13 +198,10 @@ func TestMakeMetricInfFields(t *testing.T) { inf := math.Inf(1) ninf := math.Inf(-1) now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) @@ -244,13 +225,10 @@ func TestMakeMetricInfFields(t *testing.T) { func TestMakeMetricAllFieldTypes(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) @@ -284,12 +262,10 @@ func TestMakeMetricAllFieldTypes(t *testing.T) { func TestMakeMetricNameOverride(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - NameOverride: "foobar", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + NameOverride: "foobar", + }) m := ri.MakeMetric( "RITest", @@ -307,12 +283,10 @@ func TestMakeMetricNameOverride(t *testing.T) { func TestMakeMetricNamePrefix(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - MeasurementPrefix: "foobar_", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + MeasurementPrefix: "foobar_", + }) m := ri.MakeMetric( "RITest", @@ -330,12 +304,10 @@ func TestMakeMetricNamePrefix(t *testing.T) { func TestMakeMetricNameSuffix(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - MeasurementSuffix: "_foobar", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + MeasurementSuffix: "_foobar", + }) m := ri.MakeMetric( "RITest", @@ -350,3 +322,9 @@ func TestMakeMetricNameSuffix(t *testing.T) { fmt.Sprintf("RITest_foobar value=101i %d", now.UnixNano()), ) } + +type testInput struct{} + +func (t *testInput) Description() string { return "" } +func (t *testInput) SampleConfig() string { return "" } +func (t *testInput) Gather(acc telegraf.Accumulator) error { return nil } diff --git a/internal/models/running_output.go b/internal/models/running_output.go index aa94178f74145..65e7d1da55245 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/buffer" + "github.com/influxdata/telegraf/selfstat" ) const ( @@ -21,10 +22,14 @@ type RunningOutput struct { Name string Output telegraf.Output Config *OutputConfig - Quiet bool MetricBufferLimit int MetricBatchSize int + MetricsWritten selfstat.Stat + BufferSize selfstat.Stat + BufferLimit selfstat.Stat + WriteTime selfstat.Stat + metrics *buffer.Buffer failMetrics *buffer.Buffer } @@ -50,7 +55,12 @@ func NewRunningOutput( Config: conf, MetricBufferLimit: bufferLimit, MetricBatchSize: batchSize, + MetricsWritten: selfstat.Register("outputs_"+name, "metrics_written", map[string]string{}), + BufferSize: selfstat.Register("outputs_"+name, "buffer_size", map[string]string{}), + BufferLimit: selfstat.Register("outputs_"+name, "buffer_limit", map[string]string{}), + WriteTime: selfstat.RegisterTiming("outputs_"+name, "write_time_ns", map[string]string{}), } + ro.BufferLimit.Incr(int64(ro.MetricBufferLimit)) return ro } @@ -84,28 +94,21 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { // Write writes all cached points to this output. func (ro *RunningOutput) Write() error { - if !ro.Quiet { - log.Printf("I! Output [%s] buffer fullness: %d / %d metrics. "+ - "Total gathered metrics: %d. Total dropped metrics: %d.", - ro.Name, - ro.failMetrics.Len()+ro.metrics.Len(), - ro.MetricBufferLimit, - ro.metrics.Total(), - ro.metrics.Drops()+ro.failMetrics.Drops()) - } - + nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len() + log.Printf("D! Output [%s] buffer fullness: %d / %d metrics. ", + ro.Name, nFails+nMetrics, ro.MetricBufferLimit) + ro.BufferSize.Incr(int64(nFails + nMetrics)) var err error if !ro.failMetrics.IsEmpty() { - bufLen := ro.failMetrics.Len() // how many batches of failed writes we need to write. - nBatches := bufLen/ro.MetricBatchSize + 1 + nBatches := nFails/ro.MetricBatchSize + 1 batchSize := ro.MetricBatchSize for i := 0; i < nBatches; i++ { // If it's the last batch, only grab the metrics that have not had // a write attempt already (this is primarily to preserve order). if i == nBatches-1 { - batchSize = bufLen % ro.MetricBatchSize + batchSize = nFails % ro.MetricBatchSize } batch := ro.failMetrics.Batch(batchSize) // If we've already failed previous writes, don't bother trying to @@ -126,6 +129,7 @@ func (ro *RunningOutput) Write() error { if err == nil { err = ro.write(batch) } + if err != nil { ro.failMetrics.Add(batch...) return err @@ -134,17 +138,19 @@ func (ro *RunningOutput) Write() error { } func (ro *RunningOutput) write(metrics []telegraf.Metric) error { - if metrics == nil || len(metrics) == 0 { + nMetrics := len(metrics) + if nMetrics == 0 { return nil } start := time.Now() err := ro.Output.Write(metrics) elapsed := time.Since(start) if err == nil { - if !ro.Quiet { - log.Printf("I! Output [%s] wrote batch of %d metrics in %s\n", - ro.Name, len(metrics), elapsed) - } + log.Printf("D! Output [%s] wrote batch of %d metrics in %s\n", + ro.Name, nMetrics, elapsed) + ro.MetricsWritten.Incr(int64(nMetrics)) + ro.BufferSize.Incr(-int64(nMetrics)) + ro.WriteTime.Incr(elapsed.Nanoseconds()) } return err } diff --git a/internal/models/running_output_test.go b/internal/models/running_output_test.go index 2bca79a067b01..9bc1067957b0b 100644 --- a/internal/models/running_output_test.go +++ b/internal/models/running_output_test.go @@ -36,7 +36,6 @@ func BenchmarkRunningOutputAddWrite(b *testing.B) { m := &perfOutput{} ro := NewRunningOutput("test", m, conf, 1000, 10000) - ro.Quiet = true for n := 0; n < b.N; n++ { ro.AddMetric(first5[0]) @@ -52,7 +51,6 @@ func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) { m := &perfOutput{} ro := NewRunningOutput("test", m, conf, 1000, 10000) - ro.Quiet = true for n := 0; n < b.N; n++ { ro.AddMetric(first5[0]) @@ -71,7 +69,6 @@ func BenchmarkRunningOutputAddFailWrites(b *testing.B) { m := &perfOutput{} m.failWrite = true ro := NewRunningOutput("test", m, conf, 1000, 10000) - ro.Quiet = true for n := 0; n < b.N; n++ { ro.AddMetric(first5[0]) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 67b85905e8dc5..c7438b9a18ac6 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -62,6 +62,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/riak" + _ "github.com/influxdata/telegraf/plugins/inputs/self" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" _ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy" diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index ddc9ac7bf8cec..c9f86c0d10735 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/selfstat" ) const ( @@ -43,6 +44,17 @@ type HTTPListener struct { parser influx.InfluxParser acc telegraf.Accumulator pool *pool + + BytesRecv selfstat.Stat + RequestsServed selfstat.Stat + WritesServed selfstat.Stat + QueriesServed selfstat.Stat + PingsServed selfstat.Stat + RequestsRecv selfstat.Stat + WritesRecv selfstat.Stat + QueriesRecv selfstat.Stat + PingsRecv selfstat.Stat + NotFoundsServed selfstat.Stat } const sampleConfig = ` @@ -81,6 +93,20 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error { h.mu.Lock() defer h.mu.Unlock() + tags := map[string]string{ + "address": h.ServiceAddress, + } + h.BytesRecv = selfstat.Register("http_listener", "bytes_received", tags) + h.RequestsServed = selfstat.Register("http_listener", "requests_served", tags) + h.WritesServed = selfstat.Register("http_listener", "writes_served", tags) + h.QueriesServed = selfstat.Register("http_listener", "queries_served", tags) + h.PingsServed = selfstat.Register("http_listener", "pings_served", tags) + h.RequestsRecv = selfstat.Register("http_listener", "requests_received", tags) + h.WritesRecv = selfstat.Register("http_listener", "writes_received", tags) + h.QueriesRecv = selfstat.Register("http_listener", "queries_received", tags) + h.PingsRecv = selfstat.Register("http_listener", "pings_received", tags) + h.NotFoundsServed = selfstat.Register("http_listener", "not_founds_served", tags) + if h.MaxBodySize == 0 { h.MaxBodySize = DEFAULT_MAX_BODY_SIZE } @@ -141,10 +167,16 @@ func (h *HTTPListener) httpListen() error { } func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { + h.RequestsRecv.Incr(1) + defer h.RequestsServed.Incr(1) switch req.URL.Path { case "/write": + h.WritesRecv.Incr(1) + defer h.WritesServed.Incr(1) h.serveWrite(res, req) case "/query": + h.QueriesRecv.Incr(1) + defer h.QueriesServed.Incr(1) // Deliver a dummy response to the query endpoint, as some InfluxDB // clients test endpoint availability with a query res.Header().Set("Content-Type", "application/json") @@ -152,9 +184,12 @@ func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { res.WriteHeader(http.StatusOK) res.Write([]byte("{\"results\":[]}")) case "/ping": + h.PingsRecv.Incr(1) + defer h.PingsServed.Incr(1) // respond to ping requests res.WriteHeader(http.StatusNoContent) default: + defer h.NotFoundsServed.Incr(1) // Don't know how to respond to calls to other endpoints http.NotFound(res, req) } @@ -195,6 +230,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { badRequest(res) return } + h.BytesRecv.Incr(int64(n)) if err == io.EOF { if return400 { diff --git a/plugins/inputs/self/README.md b/plugins/inputs/self/README.md new file mode 100644 index 0000000000000..b996e41904dc8 --- /dev/null +++ b/plugins/inputs/self/README.md @@ -0,0 +1,81 @@ +# Self Input Plugin + +The `self` plugin collects metrics about the telegraf agent itself. + +Note that some metrics are aggregates across all instances of one type of +plugin. + +### Configuration: + +```toml +# Collect statistics about itself +[[inputs.self]] + ## If true, collect telegraf memory stats. + # collect_memstats = true +``` + +### Measurements & Fields: + +memstats are taken from the Go runtime: https://golang.org/pkg/runtime/#MemStats + +- self_memstats + - alloc_bytes + - frees + - heap_alloc_bytes + - heap_idle_bytes + - heap_in_use_bytes + - heap_objects_bytes + - heap_released_bytes + - heap_sys_bytes + - mallocs + - num_gc + - pointer_lookups + - sys_bytes + - total_alloc_bytes + +agent stats collect aggregate stats on all telegraf plugins + +- self_agent + - gather_errors + - metrics_dropped + - metrics_gathered + - metrics_written + +self_inputs_ stats collect aggregate stats on all input plugins +that are of the same input type. + +- self_inputs_ + - gather_time_ns + - metrics_gathered + +self_outputs_ stats collect aggregate stats on all output plugins +that are of the same input type. + +- self_outputs_ + - buffer_limit + - buffer_size + - metrics_written + - write_time_ns + +self_ are metrics which are defined on a per-plugin basis, and +usually contain tags which differentiate each instance of a particular type of +plugin. + +- self_ + - individual plugin-specific fields, such as requests counts. + +### Tags: + +All measurements for specific plugins are tagged with information relevant +to each particular plugin. + +### Example Output: + +``` +self_memstats,host=tars alloc_bytes=4325720i,frees=7460i,heap_alloc_bytes=4325720i,heap_idle_bytes=1433600i,heap_in_use_bytes=5283840i,heap_objects_bytes=9999i,heap_released_bytes=0i,heap_sys_bytes=6717440i,mallocs=17459i,num_gc=2i,pointer_lookups=7i,sys_bytes=11376888i,total_alloc_bytes=6748168i 1479295080000000000 +self_agent,host=tars gather_errors=0i,metrics_dropped=0i,metrics_gathered=13i,metrics_written=12i 1479295080000000000 +self_outputs_file,host=tars buffer_limit=10000i,buffer_size=0i,metrics_written=12i,write_time_ns=193407i 1479295080000000000 +self_inputs_self,host=tars gather_time_ns=319155i,metrics_gathered=13i 1479295080000000000 +self_inputs_http_listener,host=tars gather_time_ns=30677i,metrics_gathered=0i 1479295080000000000 +self_http_listener,address=:8186,host=tars bytes_received=0i,not_founds_served=0i,pings_received=0i,pings_served=0i,queries_received=0i,queries_served=0i,requests_received=0i,requests_served=0i,writes_received=0i,writes_served=0i 1479295080000000000 +``` \ No newline at end of file diff --git a/plugins/inputs/self/self.go b/plugins/inputs/self/self.go new file mode 100644 index 0000000000000..f0ea19370c801 --- /dev/null +++ b/plugins/inputs/self/self.go @@ -0,0 +1,66 @@ +package self + +import ( + "runtime" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/selfstat" +) + +type Self struct { + CollectMemstats bool +} + +func NewSelf() telegraf.Input { + return &Self{ + CollectMemstats: true, + } +} + +var sampleConfig = ` + ## If true, collect telegraf memory stats. + # collect_memstats = true +` + +func (s *Self) Description() string { + return "Collect statistics about itself" +} + +func (s *Self) SampleConfig() string { + return sampleConfig +} + +func (s *Self) Gather(acc telegraf.Accumulator) error { + if s.CollectMemstats { + m := &runtime.MemStats{} + runtime.ReadMemStats(m) + fields := map[string]interface{}{ + "alloc_bytes": m.Alloc, // bytes allocated and not yet freed + "total_alloc_bytes": m.TotalAlloc, // bytes allocated (even if freed) + "sys_bytes": m.Sys, // bytes obtained from system (sum of XxxSys below) + "pointer_lookups": m.Lookups, // number of pointer lookups + "mallocs": m.Mallocs, // number of mallocs + "frees": m.Frees, // number of frees + // Main allocation heap statistics. + "heap_alloc_bytes": m.HeapAlloc, // bytes allocated and not yet freed (same as Alloc above) + "heap_sys_bytes": m.HeapSys, // bytes obtained from system + "heap_idle_bytes": m.HeapIdle, // bytes in idle spans + "heap_in_use_bytes": m.HeapInuse, // bytes in non-idle span + "heap_released_bytes": m.HeapReleased, // bytes released to the OS + "heap_objects_bytes": m.HeapObjects, // total number of allocated objects + "num_gc": m.NumGC, + } + acc.AddFields("self_memstats", fields, map[string]string{}) + } + + for _, m := range selfstat.Metrics() { + acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + + return nil +} + +func init() { + inputs.Add("self", NewSelf) +} diff --git a/plugins/inputs/self/self_test.go b/plugins/inputs/self/self_test.go new file mode 100644 index 0000000000000..6a3cb06cf48bc --- /dev/null +++ b/plugins/inputs/self/self_test.go @@ -0,0 +1,62 @@ +package self + +import ( + "testing" + + "github.com/influxdata/telegraf/selfstat" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" +) + +func TestSelfPlugin(t *testing.T) { + s := NewSelf() + acc := &testutil.Accumulator{} + + s.Gather(acc) + assert.True(t, acc.HasMeasurement("self_memstats")) + + // test that a registered stat is incremented + stat := selfstat.Register("mytest", "test", map[string]string{"test": "foo"}) + stat.Incr(1) + stat.Incr(2) + s.Gather(acc) + acc.AssertContainsTaggedFields(t, "self_mytest", + map[string]interface{}{ + "test": int64(3), + }, + map[string]string{ + "test": "foo", + }, + ) + acc.ClearMetrics() + + // test that a registered stat is set properly + stat.Set(101) + s.Gather(acc) + acc.AssertContainsTaggedFields(t, "self_mytest", + map[string]interface{}{ + "test": int64(101), + }, + map[string]string{ + "test": "foo", + }, + ) + acc.ClearMetrics() + + // test that regular and timing stats can share the same measurement, and + // that timings are set properly. + timing := selfstat.RegisterTiming("mytest", "test_ns", map[string]string{"test": "foo"}) + timing.Incr(100) + timing.Incr(200) + s.Gather(acc) + acc.AssertContainsTaggedFields(t, "self_mytest", + map[string]interface{}{ + "test": int64(101), + "test_ns": int64(150), + }, + map[string]string{ + "test": "foo", + }, + ) +} diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go index 861442348ae13..ca1b48fb26956 100644 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/selfstat" ) type TcpListener struct { @@ -41,6 +42,12 @@ type TcpListener struct { parser parsers.Parser acc telegraf.Accumulator + + MaxConnections selfstat.Stat + CurrentConnections selfstat.Stat + TotalConnections selfstat.Stat + PacketsRecv selfstat.Stat + BytesRecv selfstat.Stat } var dropwarn = "E! Error: tcp_listener message queue full. " + @@ -91,6 +98,16 @@ func (t *TcpListener) Start(acc telegraf.Accumulator) error { t.Lock() defer t.Unlock() + tags := map[string]string{ + "address": t.ServiceAddress, + } + t.MaxConnections = selfstat.Register("tcp_listener", "max_connections", tags) + t.MaxConnections.Set(int64(t.MaxTCPConnections)) + t.CurrentConnections = selfstat.Register("tcp_listener", "current_connections", tags) + t.TotalConnections = selfstat.Register("tcp_listener", "total_connections", tags) + t.PacketsRecv = selfstat.Register("tcp_listener", "packets_received", tags) + t.BytesRecv = selfstat.Register("tcp_listener", "bytes_received", tags) + t.acc = acc t.in = make(chan []byte, t.AllowedPendingMessages) t.done = make(chan struct{}) @@ -189,6 +206,8 @@ func (t *TcpListener) refuser(conn *net.TCPConn) { // handler handles a single TCP Connection func (t *TcpListener) handler(conn *net.TCPConn, id string) { + t.CurrentConnections.Incr(1) + t.TotalConnections.Incr(1) // connection cleanup function defer func() { t.wg.Done() @@ -196,6 +215,7 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { // Add one connection potential back to channel when this one closes t.accept <- true t.forget(id) + t.CurrentConnections.Incr(-1) }() var n int @@ -212,6 +232,8 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { if n == 0 { continue } + t.BytesRecv.Incr(int64(n)) + t.PacketsRecv.Incr(1) bufCopy := make([]byte, n) copy(bufCopy, scanner.Bytes()) diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index 78687feee6356..518a3fe4881b2 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/selfstat" ) // UdpListener main struct for the collector @@ -48,6 +49,9 @@ type UdpListener struct { acc telegraf.Accumulator listener *net.UDPConn + + PacketsRecv selfstat.Stat + BytesRecv selfstat.Stat } // UDP_MAX_PACKET_SIZE is packet limit, see @@ -102,6 +106,12 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error { u.Lock() defer u.Unlock() + tags := map[string]string{ + "address": u.ServiceAddress, + } + u.PacketsRecv = selfstat.Register("udp_listener", "packets_received", tags) + u.BytesRecv = selfstat.Register("udp_listener", "bytes_received", tags) + u.acc = acc u.in = make(chan []byte, u.AllowedPendingMessages) u.done = make(chan struct{}) @@ -162,6 +172,8 @@ func (u *UdpListener) udpListen() error { } continue } + u.BytesRecv.Incr(int64(n)) + u.PacketsRecv.Incr(1) bufCopy := make([]byte, n) copy(bufCopy, buf[:n]) diff --git a/selfstat/README.md b/selfstat/README.md new file mode 100644 index 0000000000000..f87f5c14cbbd7 --- /dev/null +++ b/selfstat/README.md @@ -0,0 +1 @@ +# TODO \ No newline at end of file diff --git a/selfstat/selfstat.go b/selfstat/selfstat.go new file mode 100644 index 0000000000000..84ec730f7dab5 --- /dev/null +++ b/selfstat/selfstat.go @@ -0,0 +1,132 @@ +package selfstat + +import ( + "log" + "sync" + "time" + + "github.com/influxdata/telegraf" +) + +var ( + registry *rgstry +) + +// Stat is an interface for dealing with telegraf statistics collected +// on itself. +type Stat interface { + // Name is the name of the measurement + Name() string + // FieldName is the name of the measurement field + FieldName() string + // Tags is a tag map. Each time this is called a new map is allocated. + Tags() map[string]string + // Key is the unique measurement+tags key of the stat. + Key() uint64 + // Incr increments a regular stat by 'v'. + // in the case of a timing stat, increment adds the timing to the cache. + Incr(v int64) + // Set sets a regular stat to 'v'. + // in the case of a timing stat, set adds the timing to the cache. + Set(v int64) + // Get gets the value of the stat. In the case of timings, this returns + // an average value of all timings received since the last call to Get(). + // If no timings were received, it returns the previous value. + Get() int64 +} + +// Register registers the given measurement, field, and tags in the selfstat +// registry. If given an identical measurement, it will return the stat that's +// already been registered. +// +// The returned Stat can be incremented by the consumer of Register(), and it's +// value will be returned as a telegraf metric when Metrics() is called. +func Register(measurement, field string, tags map[string]string) Stat { + return registry.register(&stat{ + measurement: "self_" + measurement, + field: field, + tags: tags, + }) +} + +// RegisterTiming registers the given measurement, field, and tags in the selfstat +// registry. If given an identical measurement, it will return the stat that's +// already been registered. +// +// Timing stats differ from regular stats in that they accumulate multiple +// "timings" added to them, and will return the average when Get() is called. +// After Get() is called, the average is cleared and the next timing returned +// from Get() will only reflect timings added since the previous call to Get(). +// If Get() is called without receiving any new timings, then the previous value +// is used. +// +// In other words, timings are an averaged metric that get cleared on each call +// to Get(). +// +// The returned Stat can be incremented by the consumer of Register(), and it's +// value will be returned as a telegraf metric when Metrics() is called. +func RegisterTiming(measurement, field string, tags map[string]string) Stat { + return registry.register(&timingStat{ + measurement: "self_" + measurement, + field: field, + tags: tags, + }) +} + +// Metrics returns all registered stats as telegraf metrics. +func Metrics() []telegraf.Metric { + registry.mu.Lock() + now := time.Now() + metrics := make([]telegraf.Metric, len(registry.stats)) + i := 0 + for _, stats := range registry.stats { + if len(stats) > 0 { + var tags map[string]string + var name string + fields := map[string]interface{}{} + for fieldname, stat := range stats { + fields[fieldname] = stat.Get() + tags = stat.Tags() + name = stat.Name() + } + metric, err := telegraf.NewMetric(name, tags, fields, now) + if err != nil { + log.Printf("E! Error creating selfstat metric: %s", err) + continue + } + metrics[i] = metric + i++ + } + } + registry.mu.Unlock() + return metrics +} + +type rgstry struct { + stats map[uint64]map[string]Stat + mu sync.Mutex +} + +func (r *rgstry) register(s Stat) Stat { + r.mu.Lock() + defer r.mu.Unlock() + if stats, ok := r.stats[s.Key()]; ok { + // measurement exists + if stat, ok := stats[s.FieldName()]; ok { + // field already exists, so don't create a new one + return stat + } + r.stats[s.Key()][s.FieldName()] = s + return s + } else { + // creating a new unique metric + r.stats[s.Key()] = map[string]Stat{s.FieldName(): s} + return s + } +} + +func init() { + registry = &rgstry{ + stats: make(map[uint64]map[string]Stat), + } +} diff --git a/selfstat/selfstat_test.go b/selfstat/selfstat_test.go new file mode 100644 index 0000000000000..d728d28f63a00 --- /dev/null +++ b/selfstat/selfstat_test.go @@ -0,0 +1,222 @@ +package selfstat + +import ( + "sync" + "testing" + + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" +) + +var ( + // only allow one test at a time + // this is because we are dealing with a global registry + testLock sync.Mutex + a int64 +) + +// testCleanup resets the global registry for test cleanup & unlocks the test lock +func testCleanup() { + registry = &rgstry{ + stats: make(map[uint64]map[string]Stat), + } + testLock.Unlock() +} + +func BenchmarkStats(b *testing.B) { + testLock.Lock() + defer testCleanup() + b1 := Register("benchmark1", "test_field1", map[string]string{"test": "foo"}) + for n := 0; n < b.N; n++ { + b1.Incr(1) + b1.Incr(3) + a = b1.Get() + } +} + +func BenchmarkTimingStats(b *testing.B) { + testLock.Lock() + defer testCleanup() + b2 := RegisterTiming("benchmark2", "test_field1", map[string]string{"test": "foo"}) + for n := 0; n < b.N; n++ { + b2.Incr(1) + b2.Incr(3) + a = b2.Get() + } +} + +func TestRegisterAndIncrAndSet(t *testing.T) { + testLock.Lock() + defer testCleanup() + s1 := Register("test", "test_field1", map[string]string{"test": "foo"}) + s2 := Register("test", "test_field2", map[string]string{"test": "foo"}) + assert.Equal(t, int64(0), s1.Get()) + + s1.Incr(10) + s1.Incr(5) + assert.Equal(t, int64(15), s1.Get()) + + s1.Set(12) + assert.Equal(t, int64(12), s1.Get()) + + s1.Incr(-2) + assert.Equal(t, int64(10), s1.Get()) + + s2.Set(101) + assert.Equal(t, int64(101), s2.Get()) + + // make sure that the same field returns the same metric + // this one should be the same as s2. + foo := Register("test", "test_field2", map[string]string{"test": "foo"}) + assert.Equal(t, int64(101), foo.Get()) + + // check that tags are consistent + assert.Equal(t, map[string]string{"test": "foo"}, foo.Tags()) + assert.Equal(t, "self_test", foo.Name()) +} + +func TestRegisterTimingAndIncrAndSet(t *testing.T) { + testLock.Lock() + defer testCleanup() + s1 := RegisterTiming("test", "test_field1_ns", map[string]string{"test": "foo"}) + s2 := RegisterTiming("test", "test_field2_ns", map[string]string{"test": "foo"}) + assert.Equal(t, int64(0), s1.Get()) + + s1.Incr(10) + s1.Incr(5) + assert.Equal(t, int64(7), s1.Get()) + // previous value is used on subsequent calls to Get() + assert.Equal(t, int64(7), s1.Get()) + + s1.Set(12) + assert.Equal(t, int64(12), s1.Get()) + + s1.Incr(-2) + assert.Equal(t, int64(-2), s1.Get()) + + s2.Set(101) + assert.Equal(t, int64(101), s2.Get()) + + // make sure that the same field returns the same metric + // this one should be the same as s2. + foo := RegisterTiming("test", "test_field2_ns", map[string]string{"test": "foo"}) + assert.Equal(t, int64(101), foo.Get()) + + // check that tags are consistent + assert.Equal(t, map[string]string{"test": "foo"}, foo.Tags()) + assert.Equal(t, "self_test", foo.Name()) +} + +func TestStatKeyConsistency(t *testing.T) { + s := &stat{ + measurement: "self_stat", + field: "myfield", + tags: map[string]string{ + "foo": "bar", + "bar": "baz", + "whose": "first", + }, + } + k := s.Key() + for i := 0; i < 5000; i++ { + // assert that the Key() func doesn't change anything. + assert.Equal(t, k, s.Key()) + + // assert that two identical measurements always produce the same key. + tmp := &stat{ + measurement: "self_stat", + field: "myfield", + tags: map[string]string{ + "foo": "bar", + "bar": "baz", + "whose": "first", + }, + } + assert.Equal(t, k, tmp.Key()) + } +} + +func TestRegisterMetricsAndVerify(t *testing.T) { + testLock.Lock() + defer testCleanup() + + // register two metrics with the same key + s1 := RegisterTiming("test_timing", "test_field1_ns", map[string]string{"test": "foo"}) + s2 := RegisterTiming("test_timing", "test_field2_ns", map[string]string{"test": "foo"}) + s1.Incr(10) + s2.Incr(15) + assert.Len(t, Metrics(), 1) + + // register two more metrics with different keys + s3 := RegisterTiming("test_timing", "test_field1_ns", map[string]string{"test": "bar"}) + s4 := RegisterTiming("test_timing", "test_field2_ns", map[string]string{"test": "baz"}) + s3.Incr(10) + s4.Incr(15) + assert.Len(t, Metrics(), 3) + + // register some non-timing metrics + s5 := Register("test", "test_field1", map[string]string{"test": "bar"}) + s6 := Register("test", "test_field2", map[string]string{"test": "baz"}) + Register("test", "test_field3", map[string]string{"test": "baz"}) + s5.Incr(10) + s5.Incr(18) + s6.Incr(15) + assert.Len(t, Metrics(), 5) + + // TODO test that the metrics are all correct + acc := testutil.Accumulator{} + acc.AddMetrics(Metrics()) + + // verify s1 & s2 + acc.AssertContainsTaggedFields(t, "self_test_timing", + map[string]interface{}{ + "test_field1_ns": int64(10), + "test_field2_ns": int64(15), + }, + map[string]string{ + "test": "foo", + }, + ) + + // verify s3 + acc.AssertContainsTaggedFields(t, "self_test_timing", + map[string]interface{}{ + "test_field1_ns": int64(10), + }, + map[string]string{ + "test": "bar", + }, + ) + + // verify s4 + acc.AssertContainsTaggedFields(t, "self_test_timing", + map[string]interface{}{ + "test_field2_ns": int64(15), + }, + map[string]string{ + "test": "baz", + }, + ) + + // verify s5 + acc.AssertContainsTaggedFields(t, "self_test", + map[string]interface{}{ + "test_field1": int64(28), + }, + map[string]string{ + "test": "bar", + }, + ) + + // verify s6 & s7 + acc.AssertContainsTaggedFields(t, "self_test", + map[string]interface{}{ + "test_field2": int64(15), + "test_field3": int64(0), + }, + map[string]string{ + "test": "baz", + }, + ) +} diff --git a/selfstat/stat.go b/selfstat/stat.go new file mode 100644 index 0000000000000..e95bb22978be5 --- /dev/null +++ b/selfstat/stat.go @@ -0,0 +1,69 @@ +package selfstat + +import ( + "hash/fnv" + "sort" + "sync/atomic" +) + +type stat struct { + v int64 + measurement string + field string + tags map[string]string + key uint64 +} + +func (s *stat) Incr(v int64) { + atomic.AddInt64(&s.v, v) +} + +func (s *stat) Set(v int64) { + atomic.StoreInt64(&s.v, v) +} + +func (s *stat) Get() int64 { + return atomic.LoadInt64(&s.v) +} + +func (s *stat) Name() string { + return s.measurement +} + +func (s *stat) FieldName() string { + return s.field +} + +// Tags returns a copy of the stat's tags. +// NOTE this allocates a new map every time it is called. +func (s *stat) Tags() map[string]string { + m := make(map[string]string, len(s.tags)) + for k, v := range s.tags { + m[k] = v + } + return m +} + +func (s *stat) Key() uint64 { + if s.key == 0 { + h := fnv.New64a() + h.Write([]byte(s.measurement)) + + tmp := make([]string, len(s.tags)*2) + i := 0 + for k, v := range s.tags { + tmp[i] = k + i++ + tmp[i] = v + i++ + } + sort.Strings(tmp) + + for _, s := range tmp { + h.Write([]byte(s)) + } + + s.key = h.Sum64() + } + return s.key +} diff --git a/selfstat/timingStat.go b/selfstat/timingStat.go new file mode 100644 index 0000000000000..f349e322636cd --- /dev/null +++ b/selfstat/timingStat.go @@ -0,0 +1,85 @@ +package selfstat + +import ( + "hash/fnv" + "sort" + "sync" +) + +type timingStat struct { + measurement string + field string + tags map[string]string + key uint64 + v int64 + prev int64 + count int64 + mu sync.Mutex +} + +func (s *timingStat) Incr(v int64) { + s.mu.Lock() + s.v += v + s.count++ + s.mu.Unlock() +} + +func (s *timingStat) Set(v int64) { + s.Incr(v) +} + +func (s *timingStat) Get() int64 { + var avg int64 + s.mu.Lock() + if s.count > 0 { + s.prev, avg = s.v/s.count, s.v/s.count + s.v = 0 + s.count = 0 + } else { + avg = s.prev + } + s.mu.Unlock() + return avg +} + +func (s *timingStat) Name() string { + return s.measurement +} + +func (s *timingStat) FieldName() string { + return s.field +} + +// Tags returns a copy of the timingStat's tags. +// NOTE this allocates a new map every time it is called. +func (s *timingStat) Tags() map[string]string { + m := make(map[string]string, len(s.tags)) + for k, v := range s.tags { + m[k] = v + } + return m +} + +func (s *timingStat) Key() uint64 { + if s.key == 0 { + h := fnv.New64a() + h.Write([]byte(s.measurement)) + + tmp := make([]string, len(s.tags)*2) + i := 0 + for k, v := range s.tags { + tmp[i] = k + i++ + tmp[i] = v + i++ + } + sort.Strings(tmp) + + for _, s := range tmp { + h.Write([]byte(s)) + } + + s.key = h.Sum64() + } + return s.key +} diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 11cea2434616d..2efee5572d19c 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" + "github.com/stretchr/testify/assert" ) @@ -110,6 +112,12 @@ func (a *Accumulator) AddGauge( a.AddFields(measurement, fields, tags, timestamp...) } +func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) { + for _, m := range metrics { + a.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } +} + // AddError appends the given error to Accumulator.Errors. func (a *Accumulator) AddError(err error) { if err == nil {