Skip to content

Commit

Permalink
Add support for pass/drop/tagpass/tagdrop for outputs
Browse files Browse the repository at this point in the history
Reuses same logic as the plugins for filtering points, should be only
a marginal performance decrease to check all the points before writing
to the output.

Added examples to the README as well (for generic pass/drop as well as
output pass/drop/tagpass/tagdrop).

X-Github-Closes #398
  • Loading branch information
oldmantaiter committed Dec 1, 2015
1 parent b705608 commit 6aaa856
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 42 deletions.
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5)
path = [ "/opt", "/home" ]
```

Below is how to configure `pass` and `drop` parameters (added in 0.1.5)

```
# Drop all metrics for guest CPU usage
[[plugins.cpu]]
drop = [ "cpu_usage_guest" ]
# Only store inode related metrics for disks
[[plugins.disk]]
pass = [ "disk_inodes" ]
```


Additional plugins (or outputs) of the same type can be specified,
just define another instance in the config file:

Expand Down Expand Up @@ -224,6 +237,27 @@ Telegraf also supports specifying multiple output sinks to send data to,
configuring each output sink is different, but examples can be
found by running `telegraf -sample-config`.

Outputs also support the same configurable options as plugins (pass, drop, tagpass, tagdrop)

```
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf"
# Drop all measurements that start with "aerospike"
drop = ["aerospike"]
# Send to a different database
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "mydb"
precision = "s"
# Only store measurements where the tag "mytag" matches the value "B"
[outputs.influxdb.tagpass]
mytag = ["B"]
```


## Supported Outputs

* influxdb
Expand Down
10 changes: 5 additions & 5 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ type Accumulator interface {
}

func NewAccumulator(
pluginConfig *config.PluginConfig,
filterConfig *config.PrefixFilterConfig,
points chan *client.Point,
) Accumulator {
acc := accumulator{}
acc.points = points
acc.pluginConfig = pluginConfig
acc.filterConfig = filterConfig
return &acc
}

Expand All @@ -47,7 +47,7 @@ type accumulator struct {

debug bool

pluginConfig *config.PluginConfig
filterConfig *config.PrefixFilterConfig

prefix string
}
Expand Down Expand Up @@ -106,8 +106,8 @@ func (ac *accumulator) AddFields(
measurement = ac.prefix + measurement
}

if ac.pluginConfig != nil {
if !ac.pluginConfig.ShouldPass(measurement) || !ac.pluginConfig.ShouldTagsPass(tags) {
if ac.filterConfig != nil {
if !ac.filterConfig.ShouldPass(measurement) || !ac.filterConfig.ShouldTagsPass(tags) {
return
}
}
Expand Down
15 changes: 13 additions & 2 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (a *Agent) writeOutput(
shutdown chan struct{},
wg *sync.WaitGroup,
) {
var filteredPoints []*client.Point
defer wg.Done()
if len(points) == 0 {
return
Expand All @@ -226,12 +227,22 @@ func (a *Agent) writeOutput(
start := time.Now()

for {
err := ro.Output.Write(points)
if (len(ro.Config.Pass) + len(ro.Config.Drop) + len(ro.Config.TagDrop) + len(ro.Config.TagPass)) > 0 {
for i := range points {
if !ro.Config.ShouldPass(points[i].Name()) || !ro.Config.ShouldTagsPass(points[i].Tags()) {
continue
}
filteredPoints = append(filteredPoints, points[i])
}
} else {
filteredPoints = points
}
err := ro.Output.Write(filteredPoints)
if err == nil {
// Write successful
elapsed := time.Since(start)
log.Printf("Flushed %d metrics to output %s in %s\n",
len(points), ro.Name, elapsed)
len(filteredPoints), ro.Name, elapsed)
return
}

Expand Down
74 changes: 43 additions & 31 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,18 @@ type TagFilter struct {
type RunningOutput struct {
Name string
Output outputs.Output
Config *PrefixFilterConfig
}

type RunningPlugin struct {
Name string
Plugin plugins.Plugin
Config *PluginConfig
Config *PrefixFilterConfig
}

// PluginConfig containing a name, interval, and drop/pass prefix lists
// PrefixFilterConfig contains the name, interval (for plugin) and drop/pass prefix lists
// Also lists the tags to filter
type PluginConfig struct {
type PrefixFilterConfig struct {
Name string

Drop []string
Expand All @@ -111,19 +112,19 @@ type PluginConfig struct {
}

// ShouldPass returns true if the metric should pass, false if should drop
// based on the drop/pass plugin parameters
func (cp *PluginConfig) ShouldPass(measurement string) bool {
if cp.Pass != nil {
for _, pat := range cp.Pass {
// based on the drop/pass filter parameters
func (pf *PrefixFilterConfig) ShouldPass(measurement string) bool {
if pf.Pass != nil {
for _, pat := range pf.Pass {
if strings.HasPrefix(measurement, pat) {
return true
}
}
return false
}

if cp.Drop != nil {
for _, pat := range cp.Drop {
if pf.Drop != nil {
for _, pat := range pf.Drop {
if strings.HasPrefix(measurement, pat) {
return false
}
Expand All @@ -135,10 +136,10 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool {
}

// ShouldTagsPass returns true if the metric should pass, false if should drop
// based on the tagdrop/tagpass plugin parameters
func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool {
if cp.TagPass != nil {
for _, pat := range cp.TagPass {
// based on the tagdrop/tagpass filter parameters
func (pf *PrefixFilterConfig) ShouldTagsPass(tags map[string]string) bool {
if pf.TagPass != nil {
for _, pat := range pf.TagPass {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if filter == tagval {
Expand All @@ -150,8 +151,8 @@ func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool {
return false
}

if cp.TagDrop != nil {
for _, pat := range cp.TagDrop {
if pf.TagDrop != nil {
for _, pat := range pf.TagDrop {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if filter == tagval {
Expand Down Expand Up @@ -469,15 +470,21 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
if !ok {
return fmt.Errorf("Undefined but requested output: %s", name)
}
o := creator()
output := creator()

if err := toml.UnmarshalTable(table, o); err != nil {
filterConfig, err := buildFilters(name, table)
if err != nil {
return err
}

if err := toml.UnmarshalTable(table, output); err != nil {
return err
}

ro := &RunningOutput{
Name: name,
Output: o,
Output: output,
Config: filterConfig,
}
c.Outputs = append(c.Outputs, ro)
return nil
Expand All @@ -493,31 +500,36 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
}
plugin := creator()

pluginConfig, err := applyPlugin(name, table, plugin)
filterConfig, err := buildFilters(name, table)
if err != nil {
return err
}

if err := toml.UnmarshalTable(table, plugin); err != nil {
return err
}

rp := &RunningPlugin{
Name: name,
Plugin: plugin,
Config: pluginConfig,
Config: filterConfig,
}
c.Plugins = append(c.Plugins, rp)
return nil
}

// applyPlugin takes defined plugin names and applies them to the given
// interface, returning a PluginConfig object in the end that can
// be inserted into a runningPlugin by the agent.
func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, error) {
cp := &PluginConfig{Name: name}
// buildFilters builds a PrefixFilterConfig (tagpass/tagdrop/pass/drop) to
// be inserted into the RunningPlugin/RunningOutput to be used for prefix
// filtering on both plugins and outputs
func buildFilters(name string, tbl *ast.Table) (*PrefixFilterConfig, error) {
pf := &PrefixFilterConfig{Name: name}

if node, ok := tbl.Fields["pass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Pass = append(cp.Pass, str.Value)
pf.Pass = append(pf.Pass, str.Value)
}
}
}
Expand All @@ -529,7 +541,7 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Drop = append(cp.Drop, str.Value)
pf.Drop = append(pf.Drop, str.Value)
}
}
}
Expand All @@ -544,7 +556,7 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
return nil, err
}

cp.Interval = dur
pf.Interval = dur
}
}
}
Expand All @@ -561,7 +573,7 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
}
}
}
cp.TagPass = append(cp.TagPass, *tagfilter)
pf.TagPass = append(pf.TagPass, *tagfilter)
}
}
}
Expand All @@ -579,7 +591,7 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
}
}
}
cp.TagDrop = append(cp.TagDrop, *tagfilter)
pf.TagDrop = append(pf.TagDrop, *tagfilter)
}
}
}
Expand All @@ -590,5 +602,5 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tagdrop")
delete(tbl.Fields, "tagpass")
return cp, toml.UnmarshalTable(tbl, p)
return pf, nil
}
8 changes: 4 additions & 4 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestConfig_LoadSinglePlugin(t *testing.T) {
memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"localhost"}

mConfig := &PluginConfig{
mConfig := &PrefixFilterConfig{
Name: "memcached",
Drop: []string{"other", "stuff"},
Pass: []string{"some", "strings"},
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"localhost"}

mConfig := &PluginConfig{
mConfig := &PrefixFilterConfig{
Name: "memcached",
Drop: []string{"other", "stuff"},
Pass: []string{"some", "strings"},
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
Name: "myothercollector",
},
}
eConfig := &PluginConfig{Name: "exec"}
eConfig := &PrefixFilterConfig{Name: "exec"}
assert.Equal(t, ex, c.Plugins[1].Plugin,
"Merged Testdata did not produce a correct exec struct.")
assert.Equal(t, eConfig, c.Plugins[1].Config,
Expand All @@ -109,7 +109,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
},
}

pConfig := &PluginConfig{Name: "procstat"}
pConfig := &PrefixFilterConfig{Name: "procstat"}

assert.Equal(t, pstat, c.Plugins[3].Plugin,
"Merged Testdata did not produce a correct procstat struct.")
Expand Down

0 comments on commit 6aaa856

Please sign in to comment.