Skip to content

Commit

Permalink
Add parser processor (influxdata#4551)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ayrdrie authored and otherpirate committed Mar 15, 2019
1 parent 35f7862 commit 6025f00
Show file tree
Hide file tree
Showing 5 changed files with 841 additions and 1 deletion.
4 changes: 3 additions & 1 deletion metric/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func (b *Builder) SetTime(tm time.Time) {
}

func (b *Builder) Reset() {
b.metric = &metric{}
b.metric = &metric{
tp: telegraf.Untyped,
}
}

func (b *Builder) Metric() (telegraf.Metric, error) {
Expand Down
1 change: 1 addition & 0 deletions plugins/processors/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/processors/converter"
_ "github.com/influxdata/telegraf/plugins/processors/enum"
_ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/parser"
_ "github.com/influxdata/telegraf/plugins/processors/printer"
_ "github.com/influxdata/telegraf/plugins/processors/regex"
_ "github.com/influxdata/telegraf/plugins/processors/rename"
Expand Down
43 changes: 43 additions & 0 deletions plugins/processors/parser/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Parser Processor Plugin

This plugin parses defined fields containing the specified data format and
creates new metrics based on the contents of the field.

## Configuration
```toml
[[processors.parser]]
## The name of the fields whose value will be parsed.
parse_fields = ["message"]

## If true, incoming metrics are not emitted.
drop_original = false

## If set to override, emitted metrics will be merged by overriding the
## original metric using the newly parsed metrics.
merge = "override"

## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

### Example:

```toml
[[processors.parser]]
parse_fields = ["message"]
merge = "override"
data_format = "logfmt"
```

**Input**:
```
syslog,appname=influxd,facility=daemon,hostname=http://influxdb.example.org (influxdb.example.org),severity=info version=1i,severity_code=6i,facility_code=3i,timestamp=1533848508138040000i,procid="6629",message=" ts=2018-08-09T21:01:48.137963Z lvl=info msg=\"Executing query\" log_id=09p7QbOG000 service=query query=\"SHOW DATABASES\""
```

**Output**:
```
syslog,appname=influxd,facility=daemon,hostname=http://influxdb.example.org (influxdb.example.org),severity=info version=1i,severity_code=6i,facility_code=3i,timestamp=1533848508138040000i,procid="6629",ts="2018-08-09T21:01:48.137963Z",lvl=info msg="Executing query",log_id="09p7QbOG000",service="query",query="SHOW DATABASES"
```
124 changes: 124 additions & 0 deletions plugins/processors/parser/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package parser

import (
"log"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/processors"
)

type Parser struct {
parsers.Config
DropOriginal bool `toml:"drop_original"`
Merge string `toml:"merge"`
ParseFields []string `toml:"parse_fields"`
Parser parsers.Parser
}

var SampleConfig = `
## The name of the fields whose value will be parsed.
parse_fields = []
## If true, incoming metrics are not emitted.
drop_original = false
## If set to override, emitted metrics will be merged by overriding the
## original metric using the newly parsed metrics.
merge = "override"
## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`

func (p *Parser) SampleConfig() string {
return SampleConfig
}

func (p *Parser) Description() string {
return "Parse a value in a specified field/tag(s) and add the result in a new metric"
}

func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
if p.Parser == nil {
var err error
p.Parser, err = parsers.NewParser(&p.Config)
if err != nil {
log.Printf("E! [processors.parser] could not create parser: %v", err)
return metrics
}
}

results := []telegraf.Metric{}

for _, metric := range metrics {
newMetrics := []telegraf.Metric{}
if !p.DropOriginal {
newMetrics = append(newMetrics, metric)
}

for _, key := range p.ParseFields {
for _, field := range metric.FieldList() {
if field.Key == key {
switch value := field.Value.(type) {
case string:
fromFieldMetric, err := p.parseField(value)
if err != nil {
log.Printf("E! [processors.parser] could not parse field %s: %v", key, err)
}

for _, m := range fromFieldMetric {
if m.Name() == "" {
m.SetName(metric.Name())
}
}

// multiple parsed fields shouldn't create multiple
// metrics so we'll merge tags/fields down into one
// prior to returning.
newMetrics = append(newMetrics, fromFieldMetric...)
default:
log.Printf("E! [processors.parser] field '%s' not a string, skipping", key)
}
}
}
}

if len(newMetrics) == 0 {
continue
}

if p.Merge == "override" {
results = append(results, merge(newMetrics[0], newMetrics[1:]))
} else {
results = append(results, newMetrics...)
}
}
return results
}

func merge(base telegraf.Metric, metrics []telegraf.Metric) telegraf.Metric {
for _, metric := range metrics {
for _, field := range metric.FieldList() {
base.AddField(field.Key, field.Value)
}
for _, tag := range metric.TagList() {
base.AddTag(tag.Key, tag.Value)
}
base.SetName(metric.Name())
}
return base
}

func (p *Parser) parseField(value string) ([]telegraf.Metric, error) {
return p.Parser.Parse([]byte(value))
}

func init() {
processors.Add("parser", func() telegraf.Processor {
return &Parser{DropOriginal: false}
})
}
Loading

0 comments on commit 6025f00

Please sign in to comment.