From 95680e1d7f54f1389ef04a5f65891ec435038600 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 16 Oct 2015 16:13:32 -0600 Subject: [PATCH] Utilizing new client and overhauling Accumulator interface Fixes #280 Fixes #281 --- accumulator.go | 191 +++++++++++++++------- agent.go | 193 +++++++++++------------ agent_test.go | 4 +- config.go | 2 + outputs/amqp/amqp.go | 30 +--- outputs/datadog/datadog.go | 46 +++--- outputs/influxdb/influxdb.go | 28 ++-- outputs/kafka/kafka.go | 32 +--- outputs/mqtt/mqtt.go | 39 ++--- outputs/opentsdb/opentsdb.go | 27 ++-- outputs/registry.go | 4 +- plugins/kafka_consumer/kafka_consumer.go | 2 +- plugins/mongodb/mongodb_data.go | 2 +- plugins/registry.go | 16 +- testutil/accumulator.go | 21 ++- 15 files changed, 326 insertions(+), 311 deletions(-) diff --git a/accumulator.go b/accumulator.go index d1f8adf614e56..fb187f966f7a4 100644 --- a/accumulator.go +++ b/accumulator.go @@ -7,15 +7,146 @@ import ( "sync" "time" - "github.com/influxdb/influxdb/client" + oldclient "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" ) +type Accumulator interface { + Add(measurement string, value interface{}, + tags map[string]string, t ...time.Time) + AddFields(measurement string, fields map[string]interface{}, + tags map[string]string, t ...time.Time) + + SetDefaultTags(tags map[string]string) + AddDefaultTag(key, value string) + + Prefix() string + SetPrefix(prefix string) + + Debug() bool + SetDebug(enabled bool) +} + +func NewAccumulator( + plugin *ConfiguredPlugin, + points chan *client.Point, +) Accumulator { + acc := accumulator{} + acc.points = points + acc.plugin = plugin + return &acc +} + +type accumulator struct { + sync.Mutex + + points chan *client.Point + + defaultTags map[string]string + + debug bool + + plugin *ConfiguredPlugin + + prefix string +} + +func (ac *accumulator) Add( + measurement string, + value interface{}, + tags map[string]string, + t ...time.Time, +) { + fields := make(map[string]interface{}) + fields["value"] = value + ac.AddFields(measurement, fields, tags, t...) +} + +func (ac *accumulator) AddFields( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + + if tags == nil { + tags = make(map[string]string) + } + + // InfluxDB client/points does not support writing uint64 + // TODO fix when it does + // https://github.com/influxdb/influxdb/pull/4508 + for k, v := range fields { + switch val := v.(type) { + case uint64: + if val < uint64(9223372036854775808) { + fields[k] = int64(val) + } else { + fields[k] = int64(9223372036854775807) + } + } + } + + var timestamp time.Time + if len(t) > 0 { + timestamp = t[0] + } else { + timestamp = time.Now() + } + + if ac.plugin != nil { + if !ac.plugin.ShouldPass(measurement, tags) { + return + } + } + + for k, v := range ac.defaultTags { + if _, ok := tags[k]; !ok { + tags[k] = v + } + } + + if ac.prefix != "" { + measurement = ac.prefix + measurement + } + + pt := client.NewPoint(measurement, tags, fields, timestamp) + if ac.debug { + fmt.Println("> " + pt.String()) + } + ac.points <- pt +} + +func (ac *accumulator) SetDefaultTags(tags map[string]string) { + ac.defaultTags = tags +} + +func (ac *accumulator) AddDefaultTag(key, value string) { + ac.defaultTags[key] = value +} + +func (ac *accumulator) Prefix() string { + return ac.prefix +} + +func (ac *accumulator) SetPrefix(prefix string) { + ac.prefix = prefix +} + +func (ac *accumulator) Debug() bool { + return ac.debug +} + +func (ac *accumulator) SetDebug(debug bool) { + ac.debug = debug +} + // BatchPoints is used to send a batch of data in a single write from telegraf // to influx type BatchPoints struct { sync.Mutex - client.BatchPoints + oldclient.BatchPoints Debug bool @@ -39,9 +170,9 @@ func (bp *BatchPoints) deepcopy() *BatchPoints { bpc.Tags[k] = v } - var pts []client.Point + var pts []oldclient.Point for _, pt := range bp.Points { - var ptc client.Point + var ptc oldclient.Point ptc.Measurement = pt.Measurement ptc.Time = pt.Time @@ -70,66 +201,20 @@ func (bp *BatchPoints) Add( measurement string, val interface{}, tags map[string]string, + timestamp ...time.Time, ) { fields := make(map[string]interface{}) fields["value"] = val bp.AddFields(measurement, fields, tags) } -// AddFieldsWithTime adds a measurement with a provided timestamp -func (bp *BatchPoints) AddFieldsWithTime( - measurement string, - fields map[string]interface{}, - tags map[string]string, - timestamp time.Time, -) { - // TODO this function should add the fields with the timestamp, but that will - // need to wait for the InfluxDB point precision/unit to be fixed - bp.AddFields(measurement, fields, tags) - // bp.Lock() - // defer bp.Unlock() - - // measurement = bp.Prefix + measurement - - // if bp.Config != nil { - // if !bp.Config.ShouldPass(measurement, tags) { - // return - // } - // } - - // if bp.Debug { - // var tg []string - - // for k, v := range tags { - // tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v)) - // } - - // var vals []string - - // for k, v := range fields { - // vals = append(vals, fmt.Sprintf("%s=%v", k, v)) - // } - - // sort.Strings(tg) - // sort.Strings(vals) - - // fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " ")) - // } - - // bp.Points = append(bp.Points, client.Point{ - // Measurement: measurement, - // Tags: tags, - // Fields: fields, - // Time: timestamp, - // }) -} - // AddFields will eventually replace the Add function, once we move to having a // single plugin as a single measurement with multiple fields func (bp *BatchPoints) AddFields( measurement string, fields map[string]interface{}, tags map[string]string, + timestamp ...time.Time, ) { bp.Lock() defer bp.Unlock() @@ -181,7 +266,7 @@ func (bp *BatchPoints) AddFields( fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " ")) } - bp.Points = append(bp.Points, client.Point{ + bp.Points = append(bp.Points, oldclient.Point{ Measurement: measurement, Tags: tags, Fields: fields, diff --git a/agent.go b/agent.go index 1ae7e0f943808..9a23a02c49947 100644 --- a/agent.go +++ b/agent.go @@ -11,6 +11,8 @@ import ( "github.com/influxdb/telegraf/outputs" "github.com/influxdb/telegraf/plugins" + + "github.com/influxdb/influxdb/client/v2" ) type runningOutput struct { @@ -30,6 +32,9 @@ type Agent struct { // Interval at which to gather information Interval Duration + // Interval at which to flush data + FlushInterval Duration + // Option for outputting data in UTC UTC bool `toml:"utc"` @@ -50,10 +55,11 @@ type Agent struct { // NewAgent returns an Agent struct based off the given Config func NewAgent(config *Config) (*Agent, error) { agent := &Agent{ - Config: config, - Interval: Duration{10 * time.Second}, - UTC: true, - Precision: "s", + Config: config, + Interval: Duration{10 * time.Second}, + FlushInterval: Duration{10 * time.Second}, + UTC: true, + Precision: "s", } // Apply the toml table to the agent config, overriding defaults @@ -170,11 +176,9 @@ func (a *Agent) LoadPlugins(filters []string) ([]string, error) { return names, nil } -// crankParallel runs the plugins that are using the same reporting interval +// gatherParallel runs the plugins that are using the same reporting interval // as the telegraf agent. -func (a *Agent) crankParallel() error { - points := make(chan *BatchPoints, len(a.plugins)) - +func (a *Agent) gatherParallel(pointChan chan *client.Point) error { var wg sync.WaitGroup start := time.Now() @@ -189,70 +193,33 @@ func (a *Agent) crankParallel() error { go func(plugin *runningPlugin) { defer wg.Done() - var bp BatchPoints - bp.Debug = a.Debug - bp.Prefix = plugin.name + "_" - bp.Config = plugin.config - bp.Precision = a.Precision - bp.Tags = a.Config.Tags + acc := NewAccumulator(plugin.config, pointChan) + acc.SetDebug(a.Debug) + acc.SetPrefix(plugin.name + "_") + acc.SetDefaultTags(a.Config.Tags) - if err := plugin.plugin.Gather(&bp); err != nil { + if err := plugin.plugin.Gather(acc); err != nil { log.Printf("Error in plugin [%s]: %s", plugin.name, err) } - points <- &bp }(plugin) } wg.Wait() - close(points) - - var bp BatchPoints - bp.Time = time.Now() - if a.UTC { - bp.Time = bp.Time.UTC() - } - bp.Precision = a.Precision - - for sub := range points { - bp.Points = append(bp.Points, sub.Points...) - } - elapsed := time.Since(start) - log.Printf("Cranking default (%s) interval, gathered %d metrics from %d plugins in %s\n", - a.Interval, len(bp.Points), counter, elapsed) - return a.flush(&bp) -} - -// crank is mostly for test purposes. -func (a *Agent) crank() error { - var bp BatchPoints - - bp.Debug = a.Debug - bp.Precision = a.Precision - - for _, plugin := range a.plugins { - bp.Prefix = plugin.name + "_" - bp.Config = plugin.config - err := plugin.plugin.Gather(&bp) - if err != nil { - return err - } - } - - bp.Tags = a.Config.Tags - bp.Time = time.Now() - if a.UTC { - bp.Time = bp.Time.UTC() - } - - return a.flush(&bp) + log.Printf("Default (%s) interval, gathered metrics from %d plugins in %s\n", + a.Interval, counter, elapsed) + return nil } -// crankSeparate runs the plugins that have been configured with their own +// gatherSeparate runs the plugins that have been configured with their own // reporting interval. -func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error { +func (a *Agent) gatherSeparate( + shutdown chan struct{}, + plugin *runningPlugin, + pointChan chan *client.Point, +) error { ticker := time.NewTicker(plugin.config.Interval) for { @@ -260,29 +227,18 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err var outerr error start := time.Now() - bp.Debug = a.Debug - - bp.Prefix = plugin.name + "_" - bp.Config = plugin.config - bp.Precision = a.Precision - bp.Tags = a.Config.Tags + acc := NewAccumulator(plugin.config, pointChan) + acc.SetDebug(a.Debug) + acc.SetPrefix(plugin.name + "_") + acc.SetDefaultTags(a.Config.Tags) - if err := plugin.plugin.Gather(&bp); err != nil { + if err := plugin.plugin.Gather(acc); err != nil { log.Printf("Error in plugin [%s]: %s", plugin.name, err) - outerr = errors.New("Error encountered processing plugins & outputs") - } - - bp.Time = time.Now() - if a.UTC { - bp.Time = bp.Time.UTC() } elapsed := time.Since(start) - log.Printf("Cranking separate (%s) interval, gathered %d metrics from %s in %s\n", + log.Printf("Separate (%s) interval, gathered %d metrics from %s in %s\n", plugin.config.Interval, len(bp.Points), plugin.name, elapsed) - if err := a.flush(&bp); err != nil { - outerr = errors.New("Error encountered processing plugins & outputs") - } if outerr != nil { return outerr @@ -297,30 +253,6 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err } } -func (a *Agent) flush(bp *BatchPoints) error { - var wg sync.WaitGroup - var outerr error - - for _, o := range a.outputs { - wg.Add(1) - - // Copy BatchPoints - bpc := bp.deepcopy() - - go func(ro *runningOutput) { - defer wg.Done() - // Log all output errors: - if err := ro.output.Write(bpc.BatchPoints); err != nil { - log.Printf("Error in output [%s]: %s", ro.name, err) - outerr = errors.New("Error encountered flushing outputs") - } - }(o) - } - - wg.Wait() - return outerr -} - // Test verifies that we can 'Gather' from all plugins with their configured // Config struct func (a *Agent) Test() error { @@ -356,10 +288,65 @@ func (a *Agent) Test() error { return nil } +func (a *Agent) flush(points []*client.Point) error { + var wg sync.WaitGroup + var outerr error + + for _, o := range a.outputs { + wg.Add(1) + + go func(ro *runningOutput) { + defer wg.Done() + // Log all output errors: + if err := ro.output.Write(points); err != nil { + log.Printf("Error in output [%s]: %s", ro.name, err) + outerr = errors.New("Error encountered flushing outputs") + } + }(o) + } + + wg.Wait() + return outerr +} + +// flusher monitors the points input channel and flushes on the minimum interval +func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error { + ticker := time.NewTicker(a.FlushInterval.Duration) + points := make([]*client.Point, 0) + for { + select { + case <-shutdown: + return nil + case <-ticker.C: + start := time.Now() + if err := a.flush(points); err != nil { + log.Printf(err.Error()) + } + elapsed := time.Since(start) + log.Printf("Flushed %d metrics in %s\n", len(points), elapsed) + points = make([]*client.Point, 0) + case pt := <-pointChan: + points = append(points, pt) + } + } +} + // Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup + // channel shared between all plugin threads for accumulating points + pointChan := make(chan *client.Point, 1000) + + wg.Add(1) + go func() { + defer wg.Done() + if err := a.flusher(shutdown, pointChan); err != nil { + log.Printf("Flusher routine failed, exiting: %s\n", err.Error()) + close(shutdown) + } + }() + for _, plugin := range a.plugins { // Start service of any ServicePlugins @@ -374,12 +361,12 @@ func (a *Agent) Run(shutdown chan struct{}) error { } // Special handling for plugins that have their own collection interval - // configured. Default intervals are handled below with crankParallel + // configured. Default intervals are handled below with gatherParallel if plugin.config.Interval != 0 { wg.Add(1) go func(plugin *runningPlugin) { defer wg.Done() - if err := a.crankSeparate(shutdown, plugin); err != nil { + if err := a.gatherSeparate(shutdown, plugin, pointChan); err != nil { log.Printf(err.Error()) } }(plugin) @@ -391,7 +378,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { ticker := time.NewTicker(a.Interval.Duration) for { - if err := a.crankParallel(); err != nil { + if err := a.gatherParallel(pointChan); err != nil { log.Printf(err.Error()) } diff --git a/agent_test.go b/agent_test.go index 3fc384d7e4e0e..cb9847bb0cec8 100644 --- a/agent_test.go +++ b/agent_test.go @@ -74,7 +74,7 @@ func TestAgent_DrivesMetrics(t *testing.T) { plugin.On("Add", "foo", 1.2, nil).Return(nil) plugin.On("Add", "bar", 888, nil).Return(nil) - err := a.crank() + err := a.gather() require.NoError(t, err) } @@ -112,7 +112,7 @@ func TestAgent_AppliesTags(t *testing.T) { plugin.On("Read").Return(msgs, nil) metrics.On("Receive", m2).Return(nil) - err := a.crank() + err := a.gather() require.NoError(t, err) } */ diff --git a/config.go b/config.go index e930c950586d9..f452a989908cf 100644 --- a/config.go +++ b/config.go @@ -355,6 +355,8 @@ var header = `# Telegraf configuration [agent] # Default data collection interval for all plugins interval = "10s" + # Default data flushing interval + flush_interval = "10s" # If utc = false, uses local time (utc is highly recommended) utc = true # Precision of writes, valid values are n, u, ms, s, m, and h diff --git a/outputs/amqp/amqp.go b/outputs/amqp/amqp.go index ec7e7332dc350..b8ae0501ddf4e 100644 --- a/outputs/amqp/amqp.go +++ b/outputs/amqp/amqp.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/outputs" "github.com/streadway/amqp" ) @@ -82,39 +82,21 @@ func (q *AMQP) Description() string { return "Configuration for the AMQP server to send metrics to" } -func (q *AMQP) Write(bp client.BatchPoints) error { +func (q *AMQP) Write(points []*client.Point) error { q.Lock() defer q.Unlock() - if len(bp.Points) == 0 { + if len(points) == 0 { return nil } - var zero_time time.Time - for _, p := range bp.Points { + for _, p := range points { // Combine tags from Point and BatchPoints and grab the resulting // line-protocol output string to write to AMQP var value, key string - if p.Raw != "" { - value = p.Raw - } else { - for k, v := range bp.Tags { - if p.Tags == nil { - p.Tags = make(map[string]string, len(bp.Tags)) - } - p.Tags[k] = v - } - if p.Time == zero_time { - if bp.Time == zero_time { - p.Time = time.Now() - } else { - p.Time = bp.Time - } - } - value = p.MarshalString() - } + value = p.String() if q.RoutingTag != "" { - if h, ok := p.Tags[q.RoutingTag]; ok { + if h, ok := p.Tags()[q.RoutingTag]; ok { key = h } } diff --git a/outputs/datadog/datadog.go b/outputs/datadog/datadog.go index 2190ea4b326ed..97e4e5c3cf8ec 100644 --- a/outputs/datadog/datadog.go +++ b/outputs/datadog/datadog.go @@ -8,7 +8,7 @@ import ( "net/url" "sort" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" t "github.com/influxdb/telegraf" "github.com/influxdb/telegraf/outputs" ) @@ -59,19 +59,19 @@ func (d *Datadog) Connect() error { return nil } -func (d *Datadog) Write(bp client.BatchPoints) error { - if len(bp.Points) == 0 { +func (d *Datadog) Write(points []*client.Point) error { + if len(points) == 0 { return nil } ts := TimeSeries{ - Series: make([]*Metric, len(bp.Points)), + Series: make([]*Metric, len(points)), } - for index, pt := range bp.Points { + for index, pt := range points { metric := &Metric{ - Metric: pt.Measurement, - Tags: buildTags(bp.Tags, pt.Tags), + Metric: pt.Name(), + Tags: buildTags(pt.Tags()), } - if p, err := buildPoint(bp, pt); err == nil { + if p, err := buildPoint(pt); err == nil { metric.Points[0] = p } ts.Series[index] = metric @@ -114,13 +114,18 @@ func (d *Datadog) authenticatedUrl() string { return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode()) } -func buildTags(bpTags map[string]string, ptTags map[string]string) []string { - tags := make([]string, (len(bpTags) + len(ptTags))) - index := 0 - for k, v := range bpTags { - tags[index] = fmt.Sprintf("%s:%s", k, v) - index += 1 +func buildPoint(pt *client.Point) (Point, error) { + var p Point + if err := p.setValue(pt.Fields()["value"]); err != nil { + return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) } + p[0] = float64(pt.UnixNano()) + return p, nil +} + +func buildTags(ptTags map[string]string) []string { + tags := make([]string, len(ptTags)) + index := 0 for k, v := range ptTags { tags[index] = fmt.Sprintf("%s:%s", k, v) index += 1 @@ -129,19 +134,6 @@ func buildTags(bpTags map[string]string, ptTags map[string]string) []string { return tags } -func buildPoint(bp client.BatchPoints, pt client.Point) (Point, error) { - var p Point - if err := p.setValue(pt.Fields["value"]); err != nil { - return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) - } - if pt.Time.IsZero() { - p[0] = float64(bp.Time.Unix()) - } else { - p[0] = float64(pt.Time.Unix()) - } - return p, nil -} - func (p *Point) setValue(v interface{}) error { switch d := v.(type) { case int: diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go index f49b58c458ad3..0cd3ca6e0eb9c 100644 --- a/outputs/influxdb/influxdb.go +++ b/outputs/influxdb/influxdb.go @@ -8,7 +8,7 @@ import ( "net/url" "strings" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" t "github.com/influxdb/telegraf" "github.com/influxdb/telegraf/outputs" ) @@ -21,9 +21,10 @@ type InfluxDB struct { Password string Database string UserAgent string + Precision string Timeout t.Duration - conns []*client.Client + conns []client.Client } var sampleConfig = ` @@ -32,6 +33,7 @@ var sampleConfig = ` urls = ["http://localhost:8086"] # required # The target database for metrics (telegraf will create it if not exists) database = "telegraf" # required + precision = "s" # Connection timeout (for the connection with InfluxDB), formatted as a string. # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". @@ -63,18 +65,15 @@ func (i *InfluxDB) Connect() error { urls = append(urls, u) } - var conns []*client.Client + var conns []client.Client for _, parsed_url := range urls { - c, err := client.NewClient(client.Config{ - URL: *parsed_url, + c := client.NewClient(client.Config{ + URL: parsed_url, Username: i.Username, Password: i.Password, UserAgent: i.UserAgent, Timeout: i.Timeout.Duration, }) - if err != nil { - return err - } conns = append(conns, c) } @@ -113,15 +112,22 @@ func (i *InfluxDB) Description() string { // Choose a random server in the cluster to write to until a successful write // occurs, logging each unsuccessful. If all servers fail, return error. -func (i *InfluxDB) Write(bp client.BatchPoints) error { - bp.Database = i.Database +func (i *InfluxDB) Write(points []*client.Point) error { + bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ + Database: i.Database, + Precision: i.Precision, + }) + + for _, point := range points { + bp.AddPoint(point) + } // This will get set to nil if a successful write occurs err := errors.New("Could not write to any InfluxDB server in cluster") p := rand.Perm(len(i.conns)) for _, n := range p { - if _, e := i.conns[n].Write(bp); e != nil { + if e := i.conns[n].Write(bp); e != nil { log.Println("ERROR: " + e.Error()) } else { err = nil diff --git a/outputs/kafka/kafka.go b/outputs/kafka/kafka.go index d0b98c42d78e3..fae9552104a67 100644 --- a/outputs/kafka/kafka.go +++ b/outputs/kafka/kafka.go @@ -3,10 +3,9 @@ package kafka import ( "errors" "fmt" - "time" "github.com/Shopify/sarama" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/outputs" ) @@ -52,40 +51,21 @@ func (k *Kafka) Description() string { return "Configuration for the Kafka server to send metrics to" } -func (k *Kafka) Write(bp client.BatchPoints) error { - if len(bp.Points) == 0 { +func (k *Kafka) Write(points []*client.Point) error { + if len(points) == 0 { return nil } - var zero_time time.Time - for _, p := range bp.Points { + for _, p := range points { // Combine tags from Point and BatchPoints and grab the resulting // line-protocol output string to write to Kafka - var value string - if p.Raw != "" { - value = p.Raw - } else { - for k, v := range bp.Tags { - if p.Tags == nil { - p.Tags = make(map[string]string, len(bp.Tags)) - } - p.Tags[k] = v - } - if p.Time == zero_time { - if bp.Time == zero_time { - p.Time = time.Now() - } else { - p.Time = bp.Time - } - } - value = p.MarshalString() - } + value := p.String() m := &sarama.ProducerMessage{ Topic: k.Topic, Value: sarama.StringEncoder(value), } - if h, ok := p.Tags[k.RoutingTag]; ok { + if h, ok := p.Tags()[k.RoutingTag]; ok { m.Key = sarama.StringEncoder(h) } diff --git a/outputs/mqtt/mqtt.go b/outputs/mqtt/mqtt.go index 6d00cab68f738..e7b7f9ac87847 100644 --- a/outputs/mqtt/mqtt.go +++ b/outputs/mqtt/mqtt.go @@ -10,7 +10,7 @@ import ( "sync" paho "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" t "github.com/influxdb/telegraf" "github.com/influxdb/telegraf/outputs" ) @@ -78,35 +78,31 @@ func (m *MQTT) Description() string { return "Configuration for MQTT server to send metrics to" } -func (m *MQTT) Write(bp client.BatchPoints) error { +func (m *MQTT) Write(points []*client.Point) error { m.Lock() defer m.Unlock() - if len(bp.Points) == 0 { + if len(points) == 0 { return nil } - hostname, ok := bp.Tags["host"] + hostname, ok := points[0].Tags()["host"] if !ok { hostname = "" } - for _, p := range bp.Points { + for _, p := range points { var t []string if m.TopicPrefix != "" { t = append(t, m.TopicPrefix) } - tm := strings.Split(p.Measurement, "_") + tm := strings.Split(p.Name(), "_") if len(tm) < 2 { - tm = []string{p.Measurement, "stat"} + tm = []string{p.Name(), "stat"} } + t = append(t, "host", hostname, tm[0], tm[1]) topic := strings.Join(t, "/") - var value string - if p.Raw != "" { - value = p.Raw - } else { - value = getValue(p.Fields["value"]) - } + value := p.String() err := m.publish(topic, value) if err != nil { return fmt.Errorf("Could not write to MQTT server, %s", err) @@ -116,23 +112,6 @@ func (m *MQTT) Write(bp client.BatchPoints) error { return nil } -func getValue(v interface{}) string { - var ret string - switch v.(type) { - default: - ret = fmt.Sprintf("%v", v) - case bool: - ret = fmt.Sprintf("%t", v) - case float32, float64: - ret = fmt.Sprintf("%f", v) - case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: - ret = fmt.Sprintf("%d", v) - case string, []byte: - ret = fmt.Sprintf("%s", v) - } - return ret -} - func (m *MQTT) publish(topic, body string) error { token := m.Client.Publish(topic, 0, false, body) token.Wait() diff --git a/outputs/opentsdb/opentsdb.go b/outputs/opentsdb/opentsdb.go index a05fd2623b546..edc388f976aa9 100644 --- a/outputs/opentsdb/opentsdb.go +++ b/outputs/opentsdb/opentsdb.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/outputs" ) @@ -58,8 +58,8 @@ func (o *OpenTSDB) Connect() error { return nil } -func (o *OpenTSDB) Write(bp client.BatchPoints) error { - if len(bp.Points) == 0 { +func (o *OpenTSDB) Write(points []*client.Point) error { + if len(points) == 0 { return nil } var timeNow = time.Now() @@ -70,19 +70,20 @@ func (o *OpenTSDB) Write(bp client.BatchPoints) error { if err != nil { return fmt.Errorf("OpenTSDB: Telnet connect fail") } - for _, pt := range bp.Points { + for _, pt := range points { metric := &MetricLine{ - Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Measurement), + Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Name()), Timestamp: timeNow.Unix(), } - metricValue, buildError := buildValue(bp, pt) + + metricValue, buildError := buildValue(pt) if buildError != nil { fmt.Printf("OpenTSDB: %s\n", buildError.Error()) continue } metric.Value = metricValue - tagsSlice := buildTags(bp.Tags, pt.Tags) + tagsSlice := buildTags(pt.Tags()) metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " ")) messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags) @@ -99,13 +100,9 @@ func (o *OpenTSDB) Write(bp client.BatchPoints) error { return nil } -func buildTags(bpTags map[string]string, ptTags map[string]string) []string { - tags := make([]string, (len(bpTags) + len(ptTags))) +func buildTags(ptTags map[string]string) []string { + tags := make([]string, len(ptTags)) index := 0 - for k, v := range bpTags { - tags[index] = fmt.Sprintf("%s=%s", k, v) - index += 1 - } for k, v := range ptTags { tags[index] = fmt.Sprintf("%s=%s", k, v) index += 1 @@ -114,9 +111,9 @@ func buildTags(bpTags map[string]string, ptTags map[string]string) []string { return tags } -func buildValue(bp client.BatchPoints, pt client.Point) (string, error) { +func buildValue(pt *client.Point) (string, error) { var retv string - var v = pt.Fields["value"] + var v = pt.Fields()["value"] switch p := v.(type) { case int64: retv = IntToString(int64(p)) diff --git a/outputs/registry.go b/outputs/registry.go index 92ce2b34e5aff..842164e0f1732 100644 --- a/outputs/registry.go +++ b/outputs/registry.go @@ -1,7 +1,7 @@ package outputs import ( - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" ) type Output interface { @@ -9,7 +9,7 @@ type Output interface { Close() error Description() string SampleConfig() string - Write(client.BatchPoints) error + Write(points []*client.Point) error } type Creator func() Output diff --git a/plugins/kafka_consumer/kafka_consumer.go b/plugins/kafka_consumer/kafka_consumer.go index c492352591604..7c1258944d74b 100644 --- a/plugins/kafka_consumer/kafka_consumer.go +++ b/plugins/kafka_consumer/kafka_consumer.go @@ -93,7 +93,7 @@ func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte } for _, point := range points { - acc.AddFieldsWithTime(point.Name(), point.Fields(), point.Tags(), point.Time()) + acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) } case <-timeout: return nil diff --git a/plugins/mongodb/mongodb_data.go b/plugins/mongodb/mongodb_data.go index bb9b7b2a4e889..fda1843bb8e15 100644 --- a/plugins/mongodb/mongodb_data.go +++ b/plugins/mongodb/mongodb_data.go @@ -89,7 +89,7 @@ func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, s } func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) { - acc.AddFieldsWithTime( + acc.AddFields( key, map[string]interface{}{ "value": val, diff --git a/plugins/registry.go b/plugins/registry.go index 88a24097b3a87..f061a3e7c20ad 100644 --- a/plugins/registry.go +++ b/plugins/registry.go @@ -6,17 +6,11 @@ type Accumulator interface { // Create a point with a value, decorating it with tags // NOTE: tags is expected to be owned by the caller, don't mutate // it after passing to Add. - Add(measurement string, value interface{}, tags map[string]string) - - // Create a point with a set of values, decorating it with tags - // NOTE: tags and values are expected to be owned by the caller, don't mutate - // them after passing to AddFieldsWithTime. - AddFieldsWithTime( - measurement string, - values map[string]interface{}, - tags map[string]string, - timestamp time.Time, - ) + Add(measurement string, value interface{}, + tags map[string]string, t ...time.Time) + + AddFields(measurement string, fields map[string]interface{}, + tags map[string]string, t ...time.Time) } type Plugin interface { diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 01d9393b43299..06fc54f41921a 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -22,7 +22,12 @@ type Accumulator struct { } // Add adds a measurement point to the accumulator -func (a *Accumulator) Add(measurement string, value interface{}, tags map[string]string) { +func (a *Accumulator) Add( + measurement string, + value interface{}, + tags map[string]string, + t ...time.Time, +) { a.Lock() defer a.Unlock() if tags == nil { @@ -38,20 +43,26 @@ func (a *Accumulator) Add(measurement string, value interface{}, tags map[string ) } -// AddFieldsWithTime adds a measurement point with a specified timestamp. -func (a *Accumulator) AddFieldsWithTime( +// AddFields adds a measurement point with a specified timestamp. +func (a *Accumulator) AddFields( measurement string, values map[string]interface{}, tags map[string]string, - timestamp time.Time, + timestamp ...time.Time, ) { + var t time.Time + if len(timestamp) > 0 { + t = timestamp[0] + } else { + t = time.Now() + } a.Points = append( a.Points, &Point{ Measurement: measurement, Values: values, Tags: tags, - Time: timestamp, + Time: t, }, ) }