Skip to content

Commit

Permalink
Add support for pass/drop/tagpass/tagdrop for outputs
Browse files Browse the repository at this point in the history
Reuses a lot of the logic for the plugin configuration, and changed
the ConfiguredPlugin struct to be ConfiguredFilter and a generic
function to parse the filter (called parseFilter).

X-Github-Closes influxdata#398
  • Loading branch information
oldmantaiter committed Nov 29, 2015
1 parent 13ccf42 commit f1556db
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 74 deletions.
4 changes: 2 additions & 2 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Accumulator interface {
}

func NewAccumulator(
plugin *ConfiguredPlugin,
plugin *ConfiguredFilter,
points chan *client.Point,
) Accumulator {
acc := accumulator{}
Expand All @@ -45,7 +45,7 @@ type accumulator struct {

debug bool

plugin *ConfiguredPlugin
plugin *ConfiguredFilter

prefix string
}
Expand Down
19 changes: 15 additions & 4 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
type runningOutput struct {
name string
output outputs.Output
config *ConfiguredFilter
}

type runningPlugin struct {
name string
plugin plugins.Plugin
config *ConfiguredPlugin
config *ConfiguredFilter
}

// Agent runs telegraf and collects data based on the given config
Expand Down Expand Up @@ -160,8 +161,8 @@ func (a *Agent) LoadOutputs(filters []string, config *Config) ([]string, error)
if err != nil {
return nil, err
}

a.outputs = append(a.outputs, &runningOutput{name, output})
config := config.GetOutputConfig(name)
a.outputs = append(a.outputs, &runningOutput{name, output, config})
names = append(names, name)
}
}
Expand Down Expand Up @@ -368,8 +369,18 @@ func (a *Agent) flush(
) {
var wg sync.WaitGroup
for _, o := range a.outputs {
filteredPoints := []*client.Point{}
for ix := range points {
if o.config.ShouldPass(points[ix].Name(), points[ix].Tags()) {
filteredPoints = append(filteredPoints, points[ix])
}
}
// If we have filtered everything for this output, keep moving
if len(filteredPoints) == 0 {
continue
}
wg.Add(1)
go a.writeOutput(points, o, shutdown, &wg)
go a.writeOutput(filteredPoints, o, shutdown, &wg)
}
if wait {
wg.Wait()
Expand Down
165 changes: 102 additions & 63 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ type Config struct {

agent *Agent
plugins map[string]plugins.Plugin
pluginConfigurations map[string]*ConfiguredPlugin
pluginConfigurations map[string]*ConfiguredFilter
outputs map[string]outputs.Output
outputConfigurations map[string]*ConfiguredFilter

agentFieldsSet []string
pluginFieldsSet map[string][]string
pluginConfigurationFieldsSet map[string][]string
outputFieldsSet map[string][]string
outputConfigurationFieldsSet map[string][]string
}

// Plugins returns the configured plugins as a map of name -> plugins.Plugin
Expand All @@ -52,9 +54,9 @@ type TagFilter struct {
Filter []string
}

// ConfiguredPlugin containing a name, interval, and drop/pass prefix lists
// ConfiguredFilter containing a name, interval, and drop/pass prefix lists
// Also lists the tags to filter
type ConfiguredPlugin struct {
type ConfiguredFilter struct {
Name string

Drop []string
Expand All @@ -67,9 +69,9 @@ type ConfiguredPlugin struct {
}

// ShouldPass returns true if the metric should pass, false if should drop
func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]string) bool {
if cp.Pass != nil {
for _, pat := range cp.Pass {
func (cf *ConfiguredFilter) ShouldPass(measurement string, tags map[string]string) bool {
if cf.Pass != nil {
for _, pat := range cf.Pass {
if strings.HasPrefix(measurement, pat) {
return true
}
Expand All @@ -78,8 +80,8 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin
return false
}

if cp.Drop != nil {
for _, pat := range cp.Drop {
if cf.Drop != nil {
for _, pat := range cf.Drop {
if strings.HasPrefix(measurement, pat) {
return false
}
Expand All @@ -88,8 +90,8 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin
return true
}

if cp.TagPass != nil {
for _, pat := range cp.TagPass {
if cf.TagPass != nil {
for _, pat := range cf.TagPass {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if filter == tagval {
Expand All @@ -101,8 +103,8 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin
return false
}

if cp.TagDrop != nil {
for _, pat := range cp.TagDrop {
if cf.TagDrop != nil {
for _, pat := range cf.TagDrop {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if filter == tagval {
Expand Down Expand Up @@ -139,10 +141,14 @@ func (c *Config) ApplyAgent(a *Agent) error {
return nil
}

func (c *Config) GetPluginConfig(name string) *ConfiguredPlugin {
func (c *Config) GetPluginConfig(name string) *ConfiguredFilter {
return c.pluginConfigurations[name]
}

func (c *Config) GetOutputConfig(name string) *ConfiguredFilter {
return c.outputConfigurations[name]
}

// Couldn't figure out how to get this to work with the declared function.

// PluginsDeclared returns the name of all plugins declared in the config.
Expand Down Expand Up @@ -469,6 +475,15 @@ func (c *Config) LoadDirectory(path string) error {
c.outputFieldsSet[outputName] = append(c.outputFieldsSet[outputName], field)
}
}
err = mergeStruct(c.outputConfigurations[outputName], subConfig.outputConfigurations[outputName], subConfig.outputConfigurationFieldsSet[outputName])
if err != nil {
return err
}
for _, field := range subConfig.outputConfigurationFieldsSet[outputName] {
if !sliceContains(field, c.outputConfigurationFieldsSet[outputName]) {
c.outputConfigurationFieldsSet[outputName] = append(c.outputConfigurationFieldsSet[outputName], field)
}
}
}
}
return nil
Expand All @@ -491,10 +506,12 @@ func LoadConfig(path string) (*Config, error) {
c := &Config{
Tags: make(map[string]string),
plugins: make(map[string]plugins.Plugin),
pluginConfigurations: make(map[string]*ConfiguredPlugin),
pluginConfigurations: make(map[string]*ConfiguredFilter),
outputs: make(map[string]outputs.Output),
outputConfigurations: make(map[string]*ConfiguredFilter),
pluginFieldsSet: make(map[string][]string),
pluginConfigurationFieldsSet: make(map[string][]string),
outputConfigurationFieldsSet: make(map[string][]string),
outputFieldsSet: make(map[string][]string),
}

Expand Down Expand Up @@ -573,73 +590,52 @@ func (c *Config) parseAgent(agentAst *ast.Table) error {
return nil
}

// Parse an output config out of the given *ast.Table.
func (c *Config) parseOutput(name string, outputAst *ast.Table, id int) error {
c.outputFieldsSet[name] = extractFieldNames(outputAst)
creator, ok := outputs.Outputs[name]
if !ok {
return fmt.Errorf("Undefined but requested output: %s", name)
}
output := creator()
err := toml.UnmarshalTable(outputAst, output)
if err != nil {
return err
}
c.outputs[fmt.Sprintf("%s-%d", name, id)] = output
return nil
}

// Parse a plugin config, plus plugin meta-config, out of the given *ast.Table.
func (c *Config) parsePlugin(name string, pluginAst *ast.Table) error {
creator, ok := plugins.Plugins[name]
if !ok {
return fmt.Errorf("Undefined but requested plugin: %s", name)
}
plugin := creator()
cp := &ConfiguredPlugin{Name: name}
cpFields := make([]string, 0, 5)
// Parse the given ast.Table for the filters
func (c *Config) parseFilter(name string, astTable *ast.Table) (*ConfiguredFilter, []string, error) {
cf := &ConfiguredFilter{Name: name}
cfFields := make([]string, 0, 5)

if node, ok := pluginAst.Fields["pass"]; ok {
if node, ok := astTable.Fields["pass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Pass = append(cp.Pass, str.Value)
cf.Pass = append(cf.Pass, str.Value)
}
}
cpFields = append(cpFields, "pass")
cfFields = append(cfFields, "pass")
}
}
}

if node, ok := pluginAst.Fields["drop"]; ok {
if node, ok := astTable.Fields["drop"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Drop = append(cp.Drop, str.Value)
cf.Drop = append(cf.Drop, str.Value)
}
}
cpFields = append(cpFields, "drop")
cfFields = append(cfFields, "drop")
}
}
}

if node, ok := pluginAst.Fields["interval"]; ok {
if node, ok := astTable.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return err
return nil, cfFields, err
}

cp.Interval = dur
cpFields = append(cpFields, "interval")
cf.Interval = dur
cfFields = append(cfFields, "interval")
}
}
}

if node, ok := pluginAst.Fields["tagpass"]; ok {
if node, ok := astTable.Fields["tagpass"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
Expand All @@ -651,14 +647,14 @@ func (c *Config) parsePlugin(name string, pluginAst *ast.Table) error {
}
}
}
cp.TagPass = append(cp.TagPass, *tagfilter)
cpFields = append(cpFields, "tagpass")
cf.TagPass = append(cf.TagPass, *tagfilter)
cfFields = append(cfFields, "tagpass")
}
}
}
}

if node, ok := pluginAst.Fields["tagdrop"]; ok {
if node, ok := astTable.Fields["tagdrop"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
Expand All @@ -670,25 +666,68 @@ func (c *Config) parsePlugin(name string, pluginAst *ast.Table) error {
}
}
}
cp.TagDrop = append(cp.TagDrop, *tagfilter)
cpFields = append(cpFields, "tagdrop")
cf.TagDrop = append(cf.TagDrop, *tagfilter)
cfFields = append(cfFields, "tagdrop")
}
}
}
}

delete(pluginAst.Fields, "drop")
delete(pluginAst.Fields, "pass")
delete(pluginAst.Fields, "interval")
delete(pluginAst.Fields, "tagdrop")
delete(pluginAst.Fields, "tagpass")
delete(astTable.Fields, "drop")
delete(astTable.Fields, "pass")
delete(astTable.Fields, "interval")
delete(astTable.Fields, "tagdrop")
delete(astTable.Fields, "tagpass")
return cf, cfFields, nil

}

// Parse an output config out of the given *ast.Table.
func (c *Config) parseOutput(name string, outputAst *ast.Table, id int) error {
c.outputFieldsSet[name] = extractFieldNames(outputAst)
creator, ok := outputs.Outputs[name]
if !ok {
return fmt.Errorf("Undefined but requested output: %s", name)
}
output := creator()

// parseFilter modifies outputAst
outputName := fmt.Sprintf("%s-%d", name, id)
cf, cfFields, err := c.parseFilter(outputName, outputAst)
if err != nil {
return err
}
c.outputFieldsSet[outputName] = extractFieldNames(outputAst)
c.outputConfigurationFieldsSet[outputName] = cfFields
err = toml.UnmarshalTable(outputAst, output)
if err != nil {
return err
}
c.outputs[outputName] = output
c.outputConfigurations[outputName] = cf
return nil
}

// Parse a plugin config, plus plugin meta-config, out of the given *ast.Table.
func (c *Config) parsePlugin(name string, pluginAst *ast.Table) error {
creator, ok := plugins.Plugins[name]
if !ok {
return fmt.Errorf("Undefined but requested plugin: %s", name)
}
plugin := creator()

// parseFilter modifies pluginAst
cf, cfFields, err := c.parseFilter(name, pluginAst)
if err != nil {
return err
}
c.pluginFieldsSet[name] = extractFieldNames(pluginAst)
c.pluginConfigurationFieldsSet[name] = cpFields
err := toml.UnmarshalTable(pluginAst, plugin)
c.pluginConfigurationFieldsSet[name] = cfFields
err = toml.UnmarshalTable(pluginAst, plugin)
if err != nil {
return err
}
c.plugins[name] = plugin
c.pluginConfigurations[name] = cp
c.pluginConfigurations[name] = cf
return nil
}
Loading

0 comments on commit f1556db

Please sign in to comment.