diff --git a/accumulator.go b/accumulator.go index 13fd6e5711852..fe0b9a4c211c7 100644 --- a/accumulator.go +++ b/accumulator.go @@ -1,6 +1,8 @@ package telegraf -import "time" +import ( + "time" +) // Accumulator is an interface for "accumulating" metrics from plugin(s). // The metrics are sent down a channel shared between all plugins. @@ -28,6 +30,9 @@ type Accumulator interface { tags map[string]string, t ...time.Time) + // TODO document + AddMetrics(metrics []Metric) + SetPrecision(precision, interval time.Duration) AddError(err error) diff --git a/agent/accumulator.go b/agent/accumulator.go index 0d682d2857b6d..0d419213f8c40 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -2,10 +2,14 @@ package agent import ( "log" - "sync/atomic" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" +) + +var ( + NErrors = selfstat.Register("agent", "gather_errors", map[string]string{}) ) type MetricMaker interface { @@ -37,8 +41,12 @@ type accumulator struct { maker MetricMaker precision time.Duration +} - errCount uint64 +func (ac *accumulator) AddMetrics(metrics []telegraf.Metric) { + for _, m := range metrics { + ac.metrics <- m + } } func (ac *accumulator) AddFields( @@ -80,7 +88,7 @@ func (ac *accumulator) AddError(err error) { if err == nil { return } - atomic.AddUint64(&ac.errCount, 1) + NErrors.Incr(1) //TODO suppress/throttle consecutive duplicate errors? log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err) } diff --git a/agent/agent.go b/agent/agent.go index 1a205e218b203..51b136255b713 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -44,8 +44,6 @@ func NewAgent(config *config.Config) (*Agent, error) { // Connect connects to all configured outputs func (a *Agent) Connect() error { for _, o := range a.Config.Outputs { - o.Quiet = a.Config.Agent.Quiet - switch ot := o.Output.(type) { case telegraf.ServiceOutput: if err := ot.Start(); err != nil { @@ -204,9 +202,6 @@ func (a *Agent) Test() error { if err := input.Input.Gather(acc); err != nil { return err } - if acc.errCount > 0 { - return fmt.Errorf("Errors encountered during processing") - } // Special instructions for some inputs. cpu, for example, needs to be // run twice in order to return cpu usage percentages. diff --git a/internal/buffer/buffer.go b/internal/buffer/buffer.go index 58cd1c3764d7e..db2cb0a840663 100644 --- a/internal/buffer/buffer.go +++ b/internal/buffer/buffer.go @@ -4,15 +4,17 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" +) + +var ( + MetricsGathered = selfstat.Register("agent", "metrics_gathered", map[string]string{}) + MetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{}) ) // Buffer is an object for storing metrics in a circular buffer. type Buffer struct { buf chan telegraf.Metric - // total dropped metrics - drops int - // total metrics added - total int mu sync.Mutex } @@ -36,25 +38,14 @@ func (b *Buffer) Len() int { return len(b.buf) } -// Drops returns the total number of dropped metrics that have occured in this -// buffer since instantiation. -func (b *Buffer) Drops() int { - return b.drops -} - -// Total returns the total number of metrics that have been added to this buffer. -func (b *Buffer) Total() int { - return b.total -} - // Add adds metrics to the buffer. func (b *Buffer) Add(metrics ...telegraf.Metric) { for i, _ := range metrics { - b.total++ select { case b.buf <- metrics[i]: + MetricsGathered.Incr(1) default: - b.drops++ + MetricsDropped.Incr(1) <-b.buf b.buf <- metrics[i] } diff --git a/internal/config/config.go b/internal/config/config.go index 2c2199dacb00e..24dec4169bfc9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -821,10 +821,7 @@ func (c *Config) addInput(name string, table *ast.Table) error { return err } - rp := &models.RunningInput{ - Input: input, - Config: pluginConfig, - } + rp := models.NewRunningInput(input, pluginConfig) c.Inputs = append(c.Inputs, rp) return nil } diff --git a/internal/models/running_input.go b/internal/models/running_input.go index 558af3e5c0d12..37fead15f66b1 100644 --- a/internal/models/running_input.go +++ b/internal/models/running_input.go @@ -5,6 +5,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" ) type RunningInput struct { @@ -14,6 +15,19 @@ type RunningInput struct { trace bool debug bool defaultTags map[string]string + + MetricsGathered selfstat.Stat +} + +func NewRunningInput( + input telegraf.Input, + config *InputConfig, +) *RunningInput { + return &RunningInput{ + Input: input, + Config: config, + MetricsGathered: selfstat.Register("inputs", "metrics_gathered", map[string]string{"input": config.Name}), + } } // InputConfig containing a name, interval, and filter @@ -60,6 +74,7 @@ func (r *RunningInput) MakeMetric( fmt.Println("> " + m.String()) } + r.MetricsGathered.Incr(1) return m } diff --git a/internal/models/running_input_test.go b/internal/models/running_input_test.go index 3d3b65b953151..34d61807909d8 100644 --- a/internal/models/running_input_test.go +++ b/internal/models/running_input_test.go @@ -13,11 +13,9 @@ import ( func TestMakeMetricNoFields(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) m := ri.MakeMetric( "RITest", @@ -32,11 +30,9 @@ func TestMakeMetricNoFields(t *testing.T) { // nil fields should get dropped func TestMakeMetricNilFields(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) m := ri.MakeMetric( "RITest", @@ -58,11 +54,9 @@ func TestMakeMetricNilFields(t *testing.T) { // make an untyped, counter, & gauge metric func TestMakeMetric(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) ri.SetDebug(true) assert.Equal(t, true, ri.Debug()) ri.SetTrace(true) @@ -126,14 +120,12 @@ func TestMakeMetric(t *testing.T) { func TestMakeMetricWithPluginTags(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - Tags: map[string]string{ - "foo": "bar", - }, + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + Tags: map[string]string{ + "foo": "bar", }, - } + }) ri.SetDebug(true) assert.Equal(t, true, ri.Debug()) ri.SetTrace(true) @@ -155,15 +147,13 @@ func TestMakeMetricWithPluginTags(t *testing.T) { func TestMakeMetricFilteredOut(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - Tags: map[string]string{ - "foo": "bar", - }, - Filter: Filter{NamePass: []string{"foobar"}}, + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + Tags: map[string]string{ + "foo": "bar", }, - } + Filter: Filter{NamePass: []string{"foobar"}}, + }) ri.SetDebug(true) assert.Equal(t, true, ri.Debug()) ri.SetTrace(true) @@ -182,11 +172,9 @@ func TestMakeMetricFilteredOut(t *testing.T) { func TestMakeMetricWithDaemonTags(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) ri.SetDefaultTags(map[string]string{ "foo": "bar", }) @@ -214,11 +202,9 @@ func TestMakeMetricInfFields(t *testing.T) { inf := math.Inf(1) ninf := math.Inf(-1) now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) ri.SetDebug(true) assert.Equal(t, true, ri.Debug()) ri.SetTrace(true) @@ -244,11 +230,9 @@ func TestMakeMetricInfFields(t *testing.T) { func TestMakeMetricAllFieldTypes(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) ri.SetDebug(true) assert.Equal(t, true, ri.Debug()) ri.SetTrace(true) @@ -284,12 +268,10 @@ func TestMakeMetricAllFieldTypes(t *testing.T) { func TestMakeMetricNameOverride(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - NameOverride: "foobar", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + NameOverride: "foobar", + }) m := ri.MakeMetric( "RITest", @@ -307,12 +289,10 @@ func TestMakeMetricNameOverride(t *testing.T) { func TestMakeMetricNamePrefix(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - MeasurementPrefix: "foobar_", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + MeasurementPrefix: "foobar_", + }) m := ri.MakeMetric( "RITest", @@ -330,12 +310,10 @@ func TestMakeMetricNamePrefix(t *testing.T) { func TestMakeMetricNameSuffix(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - MeasurementSuffix: "_foobar", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + MeasurementSuffix: "_foobar", + }) m := ri.MakeMetric( "RITest", @@ -350,3 +328,9 @@ func TestMakeMetricNameSuffix(t *testing.T) { fmt.Sprintf("RITest_foobar value=101i %d", now.UnixNano()), ) } + +type testInput struct{} + +func (t *testInput) Description() string { return "" } +func (t *testInput) SampleConfig() string { return "" } +func (t *testInput) Gather(acc telegraf.Accumulator) error { return nil } diff --git a/internal/models/running_output.go b/internal/models/running_output.go index aa94178f74145..9896ce726ab40 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/buffer" + "github.com/influxdata/telegraf/selfstat" ) const ( @@ -21,9 +22,11 @@ type RunningOutput struct { Name string Output telegraf.Output Config *OutputConfig - Quiet bool MetricBufferLimit int MetricBatchSize int + MetricsWritten selfstat.Stat + BufferSize selfstat.Stat + BufferLimit selfstat.Stat metrics *buffer.Buffer failMetrics *buffer.Buffer @@ -50,7 +53,11 @@ func NewRunningOutput( Config: conf, MetricBufferLimit: bufferLimit, MetricBatchSize: batchSize, + MetricsWritten: selfstat.Register("outputs", "metrics_written", map[string]string{"output": name}), + BufferSize: selfstat.Register("outputs", "buffer_size", map[string]string{"output": name}), + BufferLimit: selfstat.Register("outputs", "buffer_limit", map[string]string{"output": name}), } + ro.BufferLimit.Incr(int64(ro.MetricBufferLimit)) return ro } @@ -84,16 +91,7 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { // Write writes all cached points to this output. func (ro *RunningOutput) Write() error { - if !ro.Quiet { - log.Printf("I! Output [%s] buffer fullness: %d / %d metrics. "+ - "Total gathered metrics: %d. Total dropped metrics: %d.", - ro.Name, - ro.failMetrics.Len()+ro.metrics.Len(), - ro.MetricBufferLimit, - ro.metrics.Total(), - ro.metrics.Drops()+ro.failMetrics.Drops()) - } - + ro.BufferSize.Incr(int64(ro.metrics.Len())) var err error if !ro.failMetrics.IsEmpty() { bufLen := ro.failMetrics.Len() @@ -126,6 +124,7 @@ func (ro *RunningOutput) Write() error { if err == nil { err = ro.write(batch) } + if err != nil { ro.failMetrics.Add(batch...) return err @@ -141,10 +140,11 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error { err := ro.Output.Write(metrics) elapsed := time.Since(start) if err == nil { - if !ro.Quiet { - log.Printf("I! Output [%s] wrote batch of %d metrics in %s\n", - ro.Name, len(metrics), elapsed) - } + ro.MetricsWritten.Incr(int64(len(metrics))) + ro.BufferSize.Incr(-int64(len(metrics))) + // TODO write one-off "elapsed" metric and remove this log message + log.Printf("I! Output [%s] wrote batch of %d metrics in %s\n", + ro.Name, len(metrics), elapsed) } return err } diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 67b85905e8dc5..c7438b9a18ac6 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -62,6 +62,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/riak" + _ "github.com/influxdata/telegraf/plugins/inputs/self" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" _ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy" diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index ddc9ac7bf8cec..c5c8fa654cc9e 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/selfstat" ) const ( @@ -43,6 +44,17 @@ type HTTPListener struct { parser influx.InfluxParser acc telegraf.Accumulator pool *pool + + BytesRecv selfstat.Stat + RequestsServed selfstat.Stat + WritesServed selfstat.Stat + QueriesServed selfstat.Stat + PingsServed selfstat.Stat + RequestsRecv selfstat.Stat + WritesRecv selfstat.Stat + QueriesRecv selfstat.Stat + PingsRecv selfstat.Stat + NotFoundsServed selfstat.Stat } const sampleConfig = ` @@ -81,6 +93,21 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error { h.mu.Lock() defer h.mu.Unlock() + tags := map[string]string{ + "input": "http_listener", + "address": h.ServiceAddress, + } + h.BytesRecv = selfstat.Register("inputs", "bytes_received", tags) + h.RequestsServed = selfstat.Register("inputs", "requests_served", tags) + h.WritesServed = selfstat.Register("inputs", "writes_served", tags) + h.QueriesServed = selfstat.Register("inputs", "queries_served", tags) + h.PingsServed = selfstat.Register("inputs", "pings_served", tags) + h.RequestsRecv = selfstat.Register("inputs", "requests_received", tags) + h.WritesRecv = selfstat.Register("inputs", "writes_received", tags) + h.QueriesRecv = selfstat.Register("inputs", "queries_received", tags) + h.PingsRecv = selfstat.Register("inputs", "pings_received", tags) + h.NotFoundsServed = selfstat.Register("inputs", "not_founds_served", tags) + if h.MaxBodySize == 0 { h.MaxBodySize = DEFAULT_MAX_BODY_SIZE } @@ -141,10 +168,16 @@ func (h *HTTPListener) httpListen() error { } func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { + h.RequestsRecv.Incr(1) + defer h.RequestsServed.Incr(1) switch req.URL.Path { case "/write": + h.WritesRecv.Incr(1) + defer h.WritesServed.Incr(1) h.serveWrite(res, req) case "/query": + h.QueriesRecv.Incr(1) + defer h.QueriesServed.Incr(1) // Deliver a dummy response to the query endpoint, as some InfluxDB // clients test endpoint availability with a query res.Header().Set("Content-Type", "application/json") @@ -152,9 +185,12 @@ func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { res.WriteHeader(http.StatusOK) res.Write([]byte("{\"results\":[]}")) case "/ping": + h.PingsRecv.Incr(1) + defer h.PingsServed.Incr(1) // respond to ping requests res.WriteHeader(http.StatusNoContent) default: + defer h.NotFoundsServed.Incr(1) // Don't know how to respond to calls to other endpoints http.NotFound(res, req) } @@ -195,6 +231,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { badRequest(res) return } + h.BytesRecv.Incr(int64(n)) if err == io.EOF { if return400 { diff --git a/plugins/inputs/self/self.go b/plugins/inputs/self/self.go new file mode 100644 index 0000000000000..26260c9e6d269 --- /dev/null +++ b/plugins/inputs/self/self.go @@ -0,0 +1,68 @@ +package self + +import ( + "runtime" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/selfstat" +) + +type Self struct { + CollectMemstats bool +} + +var sampleConfig = ` + ## If true, collect telegraf memory stats. + # collect_memstats = true +` + +func (s *Self) Description() string { + return "Collect statistics about itself" +} + +func (s *Self) SampleConfig() string { + return sampleConfig +} + +func (s *Self) Gather(acc telegraf.Accumulator) error { + if s.CollectMemstats { + m := &runtime.MemStats{} + runtime.ReadMemStats(m) + fields := map[string]interface{}{ + "alloc_bytes": m.Alloc, // bytes allocated and not yet freed + "total_alloc_bytes": m.TotalAlloc, // bytes allocated (even if freed) + "sys_bytes": m.Sys, // bytes obtained from system (sum of XxxSys below) + "pointer_lookups": m.Lookups, // number of pointer lookups + "mallocs": m.Mallocs, // number of mallocs + "frees": m.Frees, // number of frees + // Main allocation heap statistics. + "heap_alloc_bytes": m.HeapAlloc, // bytes allocated and not yet freed (same as Alloc above) + "heap_sys_bytes": m.HeapSys, // bytes obtained from system + "heap_idle_bytes": m.HeapIdle, // bytes in idle spans + "heap_in_use_bytes": m.HeapInuse, // bytes in non-idle span + "heap_released_bytes": m.HeapReleased, // bytes released to the OS + "heap_objects_bytes": m.HeapObjects, // total number of allocated objects + "num_gc": m.NumGC, + } + acc.AddFields("telegraf_memstats", fields, map[string]string{}) + } + acc.AddMetrics(selfstat.Metrics()) + return nil +} + +func (s *Self) Start() error { + selfstat.Enabled = true + return nil +} + +func (s *Self) Stop() { +} + +func init() { + inputs.Add("self", func() telegraf.Input { + return &Self{ + CollectMemstats: true, + } + }) +} diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go index 861442348ae13..238c9b09b516b 100644 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/selfstat" ) type TcpListener struct { @@ -41,6 +42,12 @@ type TcpListener struct { parser parsers.Parser acc telegraf.Accumulator + + MaxConnections selfstat.Stat + CurrentConnections selfstat.Stat + TotalConnections selfstat.Stat + PacketsRecv selfstat.Stat + BytesRecv selfstat.Stat } var dropwarn = "E! Error: tcp_listener message queue full. " + @@ -91,6 +98,17 @@ func (t *TcpListener) Start(acc telegraf.Accumulator) error { t.Lock() defer t.Unlock() + tags := map[string]string{ + "input": "tcp_listener", + "address": t.ServiceAddress, + } + t.MaxConnections = selfstat.Register("inputs", "max_connections", tags) + t.MaxConnections.Set(int64(t.MaxTCPConnections)) + t.CurrentConnections = selfstat.Register("inputs", "current_connections", tags) + t.TotalConnections = selfstat.Register("inputs", "total_connections", tags) + t.PacketsRecv = selfstat.Register("inputs", "packets_received", tags) + t.BytesRecv = selfstat.Register("inputs", "bytes_received", tags) + t.acc = acc t.in = make(chan []byte, t.AllowedPendingMessages) t.done = make(chan struct{}) @@ -189,6 +207,8 @@ func (t *TcpListener) refuser(conn *net.TCPConn) { // handler handles a single TCP Connection func (t *TcpListener) handler(conn *net.TCPConn, id string) { + t.CurrentConnections.Incr(1) + t.TotalConnections.Incr(1) // connection cleanup function defer func() { t.wg.Done() @@ -196,6 +216,7 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { // Add one connection potential back to channel when this one closes t.accept <- true t.forget(id) + t.CurrentConnections.Incr(-1) }() var n int @@ -212,6 +233,8 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { if n == 0 { continue } + t.BytesRecv.Incr(int64(n)) + t.PacketsRecv.Incr(1) bufCopy := make([]byte, n) copy(bufCopy, scanner.Bytes()) diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index f8dff5269360b..7cc2dc1c1024e 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/selfstat" ) type UdpListener struct { @@ -36,6 +37,9 @@ type UdpListener struct { acc telegraf.Accumulator listener *net.UDPConn + + PacketsRecv selfstat.Stat + BytesRecv selfstat.Stat } // UDP packet limit, see @@ -86,6 +90,13 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error { u.Lock() defer u.Unlock() + tags := map[string]string{ + "input": "udp_listener", + "address": u.ServiceAddress, + } + u.PacketsRecv = selfstat.Register("inputs", "packets_received", tags) + u.BytesRecv = selfstat.Register("inputs", "bytes_received", tags) + u.acc = acc u.in = make(chan []byte, u.AllowedPendingMessages) u.done = make(chan struct{}) @@ -133,6 +144,8 @@ func (u *UdpListener) udpListen() error { } continue } + u.BytesRecv.Incr(int64(n)) + u.PacketsRecv.Incr(1) bufCopy := make([]byte, n) copy(bufCopy, buf[:n]) diff --git a/selfstat/selfstat.go b/selfstat/selfstat.go new file mode 100644 index 0000000000000..241d1e951ecf5 --- /dev/null +++ b/selfstat/selfstat.go @@ -0,0 +1,147 @@ +package selfstat + +import ( + "hash/fnv" + "log" + "sync" + "sync/atomic" + "time" + + "github.com/influxdata/telegraf" +) + +var ( + Enabled = false + + registry = &rgstry{stats: make(map[uint64]map[string]Stat)} +) + +type Stat interface { + Name() string + FieldName() string + Tags() map[string]string + Key() uint64 + Incr(v int64) + Set(v int64) + Get() int64 +} + +func Register(measurement, field string, tags map[string]string) Stat { + return registry.register(&stat{ + measurement: measurement, + field: field, + metadata: tags, + }) +} + +func Metrics() []telegraf.Metric { + registry.mu.Lock() + now := time.Now() + metrics := make([]telegraf.Metric, len(registry.stats)) + i := 0 + for _, stats := range registry.stats { + if len(stats) > 0 { + var tags map[string]string + var name string + fields := map[string]interface{}{} + for fieldname, stat := range stats { + fields[fieldname] = stat.Get() + tags = stat.Tags() + name = stat.Name() + } + metric, err := telegraf.NewMetric("telegraf_"+name, tags, fields, now) + if err != nil { + log.Printf("E! Error creating selfstat metric: %s", err) + continue + } + metrics[i] = metric + i++ + } + } + registry.mu.Unlock() + return metrics +} + +type intStat struct { + stat + v int64 +} + +type floatStat struct { + stat + v float64 +} + +type stat struct { + measurement string + field string + metadata map[string]string + key uint64 + v int64 + registered bool +} + +func (s *stat) Incr(v int64) { + atomic.AddInt64(&s.v, v) +} + +func (s *stat) Set(v int64) { + atomic.StoreInt64(&s.v, v) +} + +func (s *stat) Get() int64 { + return atomic.LoadInt64(&s.v) +} + +func (s *stat) Name() string { + return s.measurement +} + +func (s *stat) FieldName() string { + return s.field +} + +// Metadata returns a copy of the stat's metadata. +// NOTE this allocates a new map every time it is called. +func (s *stat) Tags() map[string]string { + m := make(map[string]string, len(s.metadata)) + for k, v := range s.metadata { + m[k] = v + } + return m +} + +func (s *stat) Key() uint64 { + if s.key == 0 { + h := fnv.New64a() + h.Write([]byte(s.measurement)) + for k, v := range s.metadata { + h.Write([]byte(k + v)) + } + s.key = h.Sum64() + } + return s.key +} + +type rgstry struct { + stats map[uint64]map[string]Stat + mu sync.Mutex +} + +func (r *rgstry) register(s Stat) Stat { + r.mu.Lock() + defer r.mu.Unlock() + if stats, ok := r.stats[s.Key()]; ok { + // measurement exists + if stat, ok := stats[s.FieldName()]; ok { + // field already exists, so don't create a new one + return stat + } + r.stats[s.Key()][s.FieldName()] = s + return s + } else { + // creating a new unique metric + r.stats[s.Key()] = map[string]Stat{s.FieldName(): s} + return s + } +} diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 99f9e300683b5..1ca02fd4022d9 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" + "github.com/stretchr/testify/assert" ) @@ -110,6 +112,12 @@ func (a *Accumulator) AddGauge( a.AddFields(measurement, fields, tags, timestamp...) } +func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) { + for _, m := range metrics { + a.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } +} + // AddError appends the given error to Accumulator.Errors. func (a *Accumulator) AddError(err error) { if err == nil {