Skip to content

Commit

Permalink
Add support for pass/drop/tagpass/tagdrop for outputs
Browse files Browse the repository at this point in the history
Reuses a lot of the logic for the plugin configuration, and changed
the ConfiguredPlugin struct to be ConfiguredFilter and a generic
function to parse the filter (called parseFilter).

X-Github-Closes influxdata#398
  • Loading branch information
oldmantaiter committed Nov 29, 2015
1 parent 13ccf42 commit 363b1c7
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 74 deletions.
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5)
path = [ "/opt", "/home" ]
```

Below is how to configure `pass` and `drop` parameters (added in 0.1.5)

```
# Drop all metrics for guest CPU usage
[cpu]
drop = [ "cpu_usage_guest" ]
# Only store inode related metrics for disks
[disk]
pass = [ "disk_inodes" ]
```

## Supported Plugins

**You can view usage instructions for each plugin by running**
Expand Down Expand Up @@ -207,6 +219,27 @@ Telegraf also supports specifying multiple output sinks to send data to,
configuring each output sink is different, but examples can be
found by running `telegraf -sample-config`.

Outputs also support the same configurable options as plugins (pass, drop, tagpass, tagdrop)

```
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf"
# Drop all measurements that start with "aerospike"
drop = ["aerospike"]
# Send to a different database
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "mydb"
precision = "s"
# Only store measurements where the tag "mytag" matches the value "B"
[outputs.influxdb.tagpass]
mytag = ["B"]
```

## Supported Outputs

* influxdb
Expand Down
4 changes: 2 additions & 2 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Accumulator interface {
}

func NewAccumulator(
plugin *ConfiguredPlugin,
plugin *ConfiguredFilter,
points chan *client.Point,
) Accumulator {
acc := accumulator{}
Expand All @@ -45,7 +45,7 @@ type accumulator struct {

debug bool

plugin *ConfiguredPlugin
plugin *ConfiguredFilter

prefix string
}
Expand Down
19 changes: 15 additions & 4 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
type runningOutput struct {
name string
output outputs.Output
config *ConfiguredFilter
}

type runningPlugin struct {
name string
plugin plugins.Plugin
config *ConfiguredPlugin
config *ConfiguredFilter
}

// Agent runs telegraf and collects data based on the given config
Expand Down Expand Up @@ -160,8 +161,8 @@ func (a *Agent) LoadOutputs(filters []string, config *Config) ([]string, error)
if err != nil {
return nil, err
}

a.outputs = append(a.outputs, &runningOutput{name, output})
config := config.GetOutputConfig(name)
a.outputs = append(a.outputs, &runningOutput{name, output, config})
names = append(names, name)
}
}
Expand Down Expand Up @@ -368,8 +369,18 @@ func (a *Agent) flush(
) {
var wg sync.WaitGroup
for _, o := range a.outputs {
filteredPoints := []*client.Point{}
for ix := range points {
if o.config.ShouldPass(points[ix].Name(), points[ix].Tags()) {
filteredPoints = append(filteredPoints, points[ix])
}
}
// If we have filtered everything for this output, keep moving
if len(filteredPoints) == 0 {
continue
}
wg.Add(1)
go a.writeOutput(points, o, shutdown, &wg)
go a.writeOutput(filteredPoints, o, shutdown, &wg)
}
if wait {
wg.Wait()
Expand Down
Loading

0 comments on commit 363b1c7

Please sign in to comment.