Skip to content

Commit

Permalink
Add support for pass/drop/tagpass/tagdrop for outputs
Browse files Browse the repository at this point in the history
Reuses same logic as the plugins for filtering points, should be only
a marginal performance decrease to check all the points before writing
to the output.

Added examples to the README as well (for generic pass/drop as well as
output pass/drop/tagpass/tagdrop).

X-Github-Closes #398
  • Loading branch information
oldmantaiter committed Dec 3, 2015
1 parent b705608 commit 63f35fe
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 70 deletions.
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5)
path = [ "/opt", "/home" ]
```

Below is how to configure `pass` and `drop` parameters (added in 0.1.5)

```
# Drop all metrics for guest CPU usage
[[plugins.cpu]]
drop = [ "cpu_usage_guest" ]
# Only store inode related metrics for disks
[[plugins.disk]]
pass = [ "disk_inodes" ]
```


Additional plugins (or outputs) of the same type can be specified,
just define another instance in the config file:

Expand Down Expand Up @@ -224,6 +237,27 @@ Telegraf also supports specifying multiple output sinks to send data to,
configuring each output sink is different, but examples can be
found by running `telegraf -sample-config`.

Outputs also support the same configurable options as plugins (pass, drop, tagpass, tagdrop)

```
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf"
# Drop all measurements that start with "aerospike"
drop = ["aerospike"]
# Send to a different database
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "mydb"
precision = "s"
# Only store measurements where the tag "mytag" matches the value "B"
[outputs.influxdb.tagpass]
mytag = ["B"]
```


## Supported Outputs

* influxdb
Expand Down
2 changes: 1 addition & 1 deletion accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (ac *accumulator) AddFields(
}

if ac.pluginConfig != nil {
if !ac.pluginConfig.ShouldPass(measurement) || !ac.pluginConfig.ShouldTagsPass(tags) {
if !ac.pluginConfig.Filter.ShouldPass(measurement) || !ac.pluginConfig.Filter.ShouldTagsPass(tags) {
return
}
}
Expand Down
5 changes: 3 additions & 2 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,13 @@ func (a *Agent) writeOutput(
start := time.Now()

for {
err := ro.Output.Write(points)
filtered := ro.FilterPoints(points)
err := ro.Output.Write(filtered)
if err == nil {
// Write successful
elapsed := time.Since(start)
log.Printf("Flushed %d metrics to output %s in %s\n",
len(points), ro.Name, elapsed)
len(filtered), ro.Name, elapsed)
return
}

Expand Down
155 changes: 110 additions & 45 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

"github.com/naoina/toml"
"github.com/naoina/toml/ast"

"github.com/influxdb/influxdb/client/v2"
)

// Config specifies the URL/user/password for the database that telegraf
Expand Down Expand Up @@ -88,6 +90,7 @@ type TagFilter struct {
type RunningOutput struct {
Name string
Output outputs.Output
Config *OutputConfig
}

type RunningPlugin struct {
Expand All @@ -96,34 +99,61 @@ type RunningPlugin struct {
Config *PluginConfig
}

// PluginConfig containing a name, interval, and drop/pass prefix lists
// Also lists the tags to filter
type PluginConfig struct {
Name string

// Filter containing drop/pass and tagdrop/tagpass rules
type Filter struct {
Drop []string
Pass []string

TagDrop []TagFilter
TagPass []TagFilter

IsActive bool
}

// PluginConfig containing a name, interval, and filter
type PluginConfig struct {
Name string
Filter Filter
Interval time.Duration
}

// OutputConfig containing name and filter
type OutputConfig struct {
Name string
Filter Filter
}

// Filter returns filtered slice of client.Points based on whether filters
// are active for this RunningOutput.
func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point {
if !ro.Config.Filter.IsActive {
return points
}

var filteredPoints []*client.Point
for i := range points {
if !ro.Config.Filter.ShouldPass(points[i].Name()) || !ro.Config.Filter.ShouldTagsPass(points[i].Tags()) {
continue
}
filteredPoints = append(filteredPoints, points[i])
}
return filteredPoints
}

// ShouldPass returns true if the metric should pass, false if should drop
// based on the drop/pass plugin parameters
func (cp *PluginConfig) ShouldPass(measurement string) bool {
if cp.Pass != nil {
for _, pat := range cp.Pass {
// based on the drop/pass filter parameters
func (f Filter) ShouldPass(measurement string) bool {
if f.Pass != nil {
for _, pat := range f.Pass {
if strings.HasPrefix(measurement, pat) {
return true
}
}
return false
}

if cp.Drop != nil {
for _, pat := range cp.Drop {
if f.Drop != nil {
for _, pat := range f.Drop {
if strings.HasPrefix(measurement, pat) {
return false
}
Expand All @@ -135,10 +165,10 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool {
}

// ShouldTagsPass returns true if the metric should pass, false if should drop
// based on the tagdrop/tagpass plugin parameters
func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool {
if cp.TagPass != nil {
for _, pat := range cp.TagPass {
// based on the tagdrop/tagpass filter parameters
func (f Filter) ShouldTagsPass(tags map[string]string) bool {
if f.TagPass != nil {
for _, pat := range f.TagPass {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if filter == tagval {
Expand All @@ -150,8 +180,8 @@ func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool {
return false
}

if cp.TagDrop != nil {
for _, pat := range cp.TagDrop {
if f.TagDrop != nil {
for _, pat := range f.TagDrop {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if filter == tagval {
Expand Down Expand Up @@ -469,15 +499,21 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
if !ok {
return fmt.Errorf("Undefined but requested output: %s", name)
}
o := creator()
output := creator()

if err := toml.UnmarshalTable(table, o); err != nil {
outputConfig, err := buildOutput(name, table)
if err != nil {
return err
}

if err := toml.UnmarshalTable(table, output); err != nil {
return err
}

ro := &RunningOutput{
Name: name,
Output: o,
Output: output,
Config: outputConfig,
}
c.Outputs = append(c.Outputs, ro)
return nil
Expand All @@ -493,10 +529,15 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
}
plugin := creator()

pluginConfig, err := applyPlugin(name, table, plugin)
pluginConfig, err := buildPlugin(name, table)
if err != nil {
return err
}

if err := toml.UnmarshalTable(table, plugin); err != nil {
return err
}

rp := &RunningPlugin{
Name: name,
Plugin: plugin,
Expand All @@ -506,18 +547,19 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
return nil
}

// applyPlugin takes defined plugin names and applies them to the given
// interface, returning a PluginConfig object in the end that can
// be inserted into a runningPlugin by the agent.
func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, error) {
cp := &PluginConfig{Name: name}
// buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to
// be inserted into the OutputConfig/PluginConfig to be used for prefix
// filtering on tags and measurements
func buildFilter(tbl *ast.Table) Filter {
f := Filter{}

if node, ok := tbl.Fields["pass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Pass = append(cp.Pass, str.Value)
f.Pass = append(f.Pass, str.Value)
f.IsActive = true
}
}
}
Expand All @@ -529,26 +571,14 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Drop = append(cp.Drop, str.Value)
f.Drop = append(f.Drop, str.Value)
f.IsActive = true
}
}
}
}
}

if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

cp.Interval = dur
}
}
}

if node, ok := tbl.Fields["tagpass"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
Expand All @@ -561,7 +591,8 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
}
}
}
cp.TagPass = append(cp.TagPass, *tagfilter)
f.TagPass = append(f.TagPass, *tagfilter)
f.IsActive = true
}
}
}
Expand All @@ -579,16 +610,50 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
}
}
}
cp.TagDrop = append(cp.TagDrop, *tagfilter)
f.TagDrop = append(f.TagDrop, *tagfilter)
f.IsActive = true
}
}
}
}

delete(tbl.Fields, "drop")
delete(tbl.Fields, "pass")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tagdrop")
delete(tbl.Fields, "tagpass")
return cp, toml.UnmarshalTable(tbl, p)
return f
}

// buildPlugin parses plugin specific items from the ast.Table, builds the filter and returns a
// PluginConfig to be inserted into RunningPlugin
func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) {
cp := &PluginConfig{Name: name}
if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

cp.Interval = dur
}
}
}
delete(tbl.Fields, "interval")
cp.Filter = buildFilter(tbl)
return cp, nil

}

// buildOutput parses output specific items from the ast.Table, builds the filter and returns an
// OutputConfig to be inserted into RunningPlugin
// Note: error exists in the return for future calls that might require error
func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) {
oc := &OutputConfig{
Name: name,
Filter: buildFilter(tbl),
}
return oc, nil

}
Loading

0 comments on commit 63f35fe

Please sign in to comment.