From d7efb7a71d0bcd68840d8a67bf13914e52e369e0 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 13 Jun 2016 15:21:11 +0100 Subject: [PATCH] Add precision rounding to accumulator Adding precision rounding to the accumulator. This means that now every input metric will get rounded at collection, rather than at write (and only for the influxdb output). This feature is disabled for service inputs, because service inputs should be in control of their own timestamps & precisions. --- CHANGELOG.md | 1 + accumulator.go | 4 + agent/accumulator.go | 29 +++++ agent/accumulator_test.go | 122 ++++++++++++++++++ agent/agent.go | 7 + etc/telegraf.conf | 8 +- internal/config/config.go | 18 ++- metric.go | 9 +- metric_test.go | 17 --- plugins/inputs/prometheus/parser.go | 9 +- plugins/outputs/influxdb/influxdb.go | 8 +- .../prometheus_client_test.go | 13 +- testutil/accumulator.go | 8 ++ 13 files changed, 213 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af9444b81220e..25e5b3daa9096 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Features - [#1340](https://github.com/influxdata/telegraf/issues/1340): statsd: do not log every dropped metric. +- [#1368](https://github.com/influxdata/telegraf/pull/1368): Add precision rounding to all metrics on collection. ### Bugfixes diff --git a/accumulator.go b/accumulator.go index cbea58ebfc4fa..15c5485f8fc95 100644 --- a/accumulator.go +++ b/accumulator.go @@ -18,4 +18,8 @@ type Accumulator interface { Debug() bool SetDebug(enabled bool) + + SetPrecision(precision, interval time.Duration) + + DisablePrecision() } diff --git a/agent/accumulator.go b/agent/accumulator.go index d6ff8de60a60b..504731720cdb0 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -17,6 +17,7 @@ func NewAccumulator( acc := accumulator{} acc.metrics = metrics acc.inputConfig = inputConfig + acc.precision = time.Nanosecond return &acc } @@ -32,6 +33,8 @@ type accumulator struct { inputConfig *internal_models.InputConfig prefix string + + precision time.Duration } func (ac *accumulator) Add( @@ -141,6 +144,7 @@ func (ac *accumulator) AddFields( } else { timestamp = time.Now() } + timestamp = timestamp.Round(ac.precision) if ac.prefix != "" { measurement = ac.prefix + measurement @@ -173,6 +177,31 @@ func (ac *accumulator) SetTrace(trace bool) { ac.trace = trace } +// SetPrecision takes two time.Duration objects. If the first is non-zero, +// it sets that as the precision. Otherwise, it takes the second argument +// as the order of time that the metrics should be rounded to, with the +// maximum being 1s. +func (ac *accumulator) SetPrecision(precision, interval time.Duration) { + if precision > 0 { + ac.precision = precision + return + } + switch { + case interval >= time.Second: + ac.precision = time.Second + case interval >= time.Millisecond: + ac.precision = time.Millisecond + case interval >= time.Microsecond: + ac.precision = time.Microsecond + default: + ac.precision = time.Nanosecond + } +} + +func (ac *accumulator) DisablePrecision() { + ac.precision = time.Nanosecond +} + func (ac *accumulator) setDefaultTags(tags map[string]string) { ac.defaultTags = tags } diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index ee8f65e48ced7..9bf6811921017 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -38,6 +38,128 @@ func TestAdd(t *testing.T) { actual) } +func TestAddNoPrecisionWithInterval(t *testing.T) { + a := accumulator{} + now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.SetPrecision(0, time.Second) + a.Add("acctest", float64(101), map[string]string{}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)), + actual) +} + +func TestAddNoIntervalWithPrecision(t *testing.T) { + a := accumulator{} + now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.SetPrecision(time.Second, time.Millisecond) + a.Add("acctest", float64(101), map[string]string{}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)), + actual) +} + +func TestAddDisablePrecision(t *testing.T) { + a := accumulator{} + now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.SetPrecision(time.Second, time.Millisecond) + a.DisablePrecision() + a.Add("acctest", float64(101), map[string]string{}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082912748)), + actual) +} + +func TestDifferentPrecisions(t *testing.T) { + a := accumulator{} + now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.SetPrecision(0, time.Second) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + testm := <-a.metrics + actual := testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)), + actual) + + a.SetPrecision(0, time.Millisecond) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800083000000)), + actual) + + a.SetPrecision(0, time.Microsecond) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082913000)), + actual) + + a.SetPrecision(0, time.Nanosecond) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082912748)), + actual) +} + func TestAddDefaultTags(t *testing.T) { a := accumulator{} a.addDefaultTag("default", "tag") diff --git a/agent/agent.go b/agent/agent.go index 1423ef773eeee..d1d36186ea968 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -118,6 +118,8 @@ func (a *Agent) gatherer( acc := NewAccumulator(input.Config, metricC) acc.SetDebug(a.Config.Agent.Debug) + acc.SetPrecision(a.Config.Agent.Precision.Duration, + a.Config.Agent.Interval.Duration) acc.setDefaultTags(a.Config.Tags) internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown) @@ -201,6 +203,8 @@ func (a *Agent) Test() error { for _, input := range a.Config.Inputs { acc := NewAccumulator(input.Config, metricC) acc.SetTrace(true) + acc.SetPrecision(a.Config.Agent.Precision.Duration, + a.Config.Agent.Interval.Duration) acc.setDefaultTags(a.Config.Tags) fmt.Printf("* Plugin: %s, Collection 1\n", input.Name) @@ -289,6 +293,9 @@ func (a *Agent) Run(shutdown chan struct{}) error { case telegraf.ServiceInput: acc := NewAccumulator(input.Config, metricC) acc.SetDebug(a.Config.Agent.Debug) + // Service input plugins should set their own precision of their + // metrics. + acc.DisablePrecision() acc.setDefaultTags(a.Config.Tags) if err := p.Start(acc); err != nil { log.Printf("Service for input %s failed to start, exiting\n%s\n", diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 2519255894104..8192bd12e5666 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -52,6 +52,11 @@ ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s flush_jitter = "0s" + ## By default, precision will be set to the same timestamp order as the + ## collection interval, with the maximum being 1s. + ## Precision will NOT be used for service inputs, such as logparser and statsd. + ## Valid values are "Nns", "Nus" (or "Nµs"), "Nms", "Ns". + precision = "" ## Run telegraf in debug mode debug = false ## Run telegraf in quiet mode @@ -75,9 +80,6 @@ urls = ["http://localhost:8086"] # required ## The target database for metrics (telegraf will create it if not exists). database = "telegraf" # required - ## Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h". - ## note: using "s" precision greatly improves InfluxDB compression. - precision = "s" ## Retention policy to write to. retention_policy = "default" diff --git a/internal/config/config.go b/internal/config/config.go index fdc9a8753e283..99db2e30def9f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -77,6 +77,14 @@ type AgentConfig struct { // ie, if Interval=10s then always collect on :00, :10, :20, etc. RoundInterval bool + // By default, precision will be set to the same timestamp order as the + // collection interval, with the maximum being 1s. + // ie, when interval = "10s", precision will be "1s" + // when interval = "250ms", precision will be "1ms" + // Precision will NOT be used for service inputs. It is up to each individual + // service input to set the timestamp at the appropriate precision. + Precision internal.Duration + // CollectionJitter is used to jitter the collection by a random amount. // Each plugin will sleep for a random time within jitter before collecting. // This can be used to avoid many plugins querying things like sysfs at the @@ -108,11 +116,10 @@ type AgentConfig struct { // does _not_ deactivate FlushInterval. FlushBufferWhenFull bool - // TODO(cam): Remove UTC and Precision parameters, they are no longer + // TODO(cam): Remove UTC and parameter, they are no longer // valid for the agent config. Leaving them here for now for backwards- // compatability - UTC bool `toml:"utc"` - Precision string + UTC bool `toml:"utc"` // Debug is the option for running in debug mode Debug bool @@ -209,6 +216,11 @@ var header = `# Telegraf Configuration ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s flush_jitter = "0s" + ## By default, precision will be set to the same timestamp order as the + ## collection interval, with the maximum being 1s. + ## Precision will NOT be used for service inputs, such as logparser and statsd. + ## Valid values are "Nns", "Nus" (or "Nµs"), "Nms", "Ns". + precision = "" ## Run telegraf in debug mode debug = false ## Run telegraf in quiet mode diff --git a/metric.go b/metric.go index 574565c22bb57..0d186784ae20f 100644 --- a/metric.go +++ b/metric.go @@ -45,14 +45,9 @@ func NewMetric( name string, tags map[string]string, fields map[string]interface{}, - t ...time.Time, + t time.Time, ) (Metric, error) { - var T time.Time - if len(t) > 0 { - T = t[0] - } - - pt, err := client.NewPoint(name, tags, fields, T) + pt, err := client.NewPoint(name, tags, fields, t) if err != nil { return nil, err } diff --git a/metric_test.go b/metric_test.go index 1177ab494a683..4182c9cc1d263 100644 --- a/metric_test.go +++ b/metric_test.go @@ -51,23 +51,6 @@ func TestNewMetricString(t *testing.T) { assert.Equal(t, lineProtoPrecision, m.PrecisionString("s")) } -func TestNewMetricStringNoTime(t *testing.T) { - tags := map[string]string{ - "host": "localhost", - } - fields := map[string]interface{}{ - "usage_idle": float64(99), - } - m, err := NewMetric("cpu", tags, fields) - assert.NoError(t, err) - - lineProto := fmt.Sprintf("cpu,host=localhost usage_idle=99") - assert.Equal(t, lineProto, m.String()) - - lineProtoPrecision := fmt.Sprintf("cpu,host=localhost usage_idle=99") - assert.Equal(t, lineProtoPrecision, m.PrecisionString("s")) -} - func TestNewMetricFailNaN(t *testing.T) { now := time.Now() diff --git a/plugins/inputs/prometheus/parser.go b/plugins/inputs/prometheus/parser.go index babd257536775..e8a7c0892534e 100644 --- a/plugins/inputs/prometheus/parser.go +++ b/plugins/inputs/prometheus/parser.go @@ -10,6 +10,7 @@ import ( "io" "math" "mime" + "time" "github.com/influxdata/telegraf" @@ -88,7 +89,13 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) { } // converting to telegraf metric if len(fields) > 0 { - metric, err := telegraf.NewMetric(metricName, tags, fields) + var t time.Time + if m.TimestampMs != nil && *m.TimestampMs > 0 { + t = time.Unix(0, *m.TimestampMs*1000000) + } else { + t = time.Now() + } + metric, err := telegraf.NewMetric(metricName, tags, fields, t) if err == nil { metrics = append(metrics, metric) } diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index f359b8fab840e..d2c0523c709af 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -24,7 +24,6 @@ type InfluxDB struct { Password string Database string UserAgent string - Precision string RetentionPolicy string WriteConsistency string Timeout internal.Duration @@ -39,6 +38,9 @@ type InfluxDB struct { // Use SSL but skip chain & host verification InsecureSkipVerify bool + // Precision is only here for legacy support. It will be ignored. + Precision string + conns []client.Client } @@ -50,9 +52,6 @@ var sampleConfig = ` urls = ["http://localhost:8086"] # required ## The target database for metrics (telegraf will create it if not exists). database = "telegraf" # required - ## Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h". - ## note: using "s" precision greatly improves InfluxDB compression. - precision = "s" ## Retention policy to write to. retention_policy = "default" @@ -184,7 +183,6 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { } bp, err := client.NewBatchPoints(client.BatchPointsConfig{ Database: i.Database, - Precision: i.Precision, RetentionPolicy: i.RetentionPolicy, WriteConsistency: i.WriteConsistency, }) diff --git a/plugins/outputs/prometheus_client/prometheus_client_test.go b/plugins/outputs/prometheus_client/prometheus_client_test.go index 15ed7b7e451f8..14aee13d998a1 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_test.go @@ -17,6 +17,7 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } + now := time.Now() pTesting = &PrometheusClient{Listen: "localhost:9127"} err := pTesting.Start() time.Sleep(time.Millisecond * 200) @@ -30,11 +31,13 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { pt1, _ := telegraf.NewMetric( "test_point_1", tags, - map[string]interface{}{"value": 0.0}) + map[string]interface{}{"value": 0.0}, + now) pt2, _ := telegraf.NewMetric( "test_point_2", tags, - map[string]interface{}{"value": 1.0}) + map[string]interface{}{"value": 1.0}, + now) var metrics = []telegraf.Metric{ pt1, pt2, @@ -63,11 +66,13 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { pt3, _ := telegraf.NewMetric( "test_point_3", tags, - map[string]interface{}{"value": 0.0}) + map[string]interface{}{"value": 0.0}, + now) pt4, _ := telegraf.NewMetric( "test_point_4", tags, - map[string]interface{}{"value": 1.0}) + map[string]interface{}{"value": 1.0}, + now) metrics = []telegraf.Metric{ pt3, pt4, diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 9b6fb2373b33b..1058faf83dabd 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -84,6 +84,14 @@ func (a *Accumulator) AddFields( a.Metrics = append(a.Metrics, p) } +func (a *Accumulator) SetPrecision(precision, interval time.Duration) { + return +} + +func (a *Accumulator) DisablePrecision() { + return +} + func (a *Accumulator) Debug() bool { // stub for implementing Accumulator interface. return a.debug