Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parser processor #4551

Merged
merged 24 commits into from
Aug 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion metric/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func (b *Builder) SetTime(tm time.Time) {
}

func (b *Builder) Reset() {
b.metric = &metric{}
b.metric = &metric{
tp: telegraf.Untyped,
}
}

func (b *Builder) Metric() (telegraf.Metric, error) {
Expand Down
1 change: 1 addition & 0 deletions plugins/processors/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/processors/converter"
_ "github.com/influxdata/telegraf/plugins/processors/enum"
_ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/parser"
_ "github.com/influxdata/telegraf/plugins/processors/printer"
_ "github.com/influxdata/telegraf/plugins/processors/regex"
_ "github.com/influxdata/telegraf/plugins/processors/rename"
Expand Down
43 changes: 43 additions & 0 deletions plugins/processors/parser/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Parser Processor Plugin

This plugin parses defined fields containing the specified data format and
creates new metrics based on the contents of the field.

## Configuration
```toml
[[processors.parser]]
## The name of the fields whose value will be parsed.
parse_fields = ["message"]

## If true, incoming metrics are not emitted.
drop_original = false

## If set to override, emitted metrics will be merged by overriding the
## original metric using the newly parsed metrics.
merge = "override"

## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

### Example:

```toml
[[processors.parser]]
parse_fields = ["message"]
merge = "override"
data_format = "logfmt"
```

**Input**:
```
syslog,appname=influxd,facility=daemon,hostname=http://influxdb.example.org (influxdb.example.org),severity=info version=1i,severity_code=6i,facility_code=3i,timestamp=1533848508138040000i,procid="6629",message=" ts=2018-08-09T21:01:48.137963Z lvl=info msg=\"Executing query\" log_id=09p7QbOG000 service=query query=\"SHOW DATABASES\""
```

**Output**:
```
syslog,appname=influxd,facility=daemon,hostname=http://influxdb.example.org (influxdb.example.org),severity=info version=1i,severity_code=6i,facility_code=3i,timestamp=1533848508138040000i,procid="6629",ts="2018-08-09T21:01:48.137963Z",lvl=info msg="Executing query",log_id="09p7QbOG000",service="query",query="SHOW DATABASES"
```
124 changes: 124 additions & 0 deletions plugins/processors/parser/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package parser

import (
"log"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/processors"
)

type Parser struct {
parsers.Config
DropOriginal bool `toml:"drop_original"`
Merge string `toml:"merge"`
ParseFields []string `toml:"parse_fields"`
Parser parsers.Parser
}

var SampleConfig = `
## The name of the fields whose value will be parsed.
parse_fields = []

## If true, incoming metrics are not emitted.
drop_original = false

## If set to override, emitted metrics will be merged by overriding the
## original metric using the newly parsed metrics.
merge = "override"

## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`

func (p *Parser) SampleConfig() string {
return SampleConfig
}

func (p *Parser) Description() string {
return "Parse a value in a specified field/tag(s) and add the result in a new metric"
}

func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
if p.Parser == nil {
var err error
p.Parser, err = parsers.NewParser(&p.Config)
if err != nil {
log.Printf("E! [processors.parser] could not create parser: %v", err)
return metrics
}
}

results := []telegraf.Metric{}

for _, metric := range metrics {
newMetrics := []telegraf.Metric{}
if !p.DropOriginal {
newMetrics = append(newMetrics, metric)
}

for _, key := range p.ParseFields {
for _, field := range metric.FieldList() {
if field.Key == key {
switch value := field.Value.(type) {
case string:
fromFieldMetric, err := p.parseField(value)
if err != nil {
log.Printf("E! [processors.parser] could not parse field %s: %v", key, err)
}

for _, m := range fromFieldMetric {
if m.Name() == "" {
m.SetName(metric.Name())
}
}

// multiple parsed fields shouldn't create multiple
// metrics so we'll merge tags/fields down into one
// prior to returning.
newMetrics = append(newMetrics, fromFieldMetric...)
default:
log.Printf("E! [processors.parser] field '%s' not a string, skipping", key)
}
}
}
}

if len(newMetrics) == 0 {
continue
}

if p.Merge == "override" {
results = append(results, merge(newMetrics[0], newMetrics[1:]))
} else {
results = append(results, newMetrics...)
}
}
return results
}

func merge(base telegraf.Metric, metrics []telegraf.Metric) telegraf.Metric {
for _, metric := range metrics {
for _, field := range metric.FieldList() {
base.AddField(field.Key, field.Value)
}
for _, tag := range metric.TagList() {
base.AddTag(tag.Key, tag.Value)
}
base.SetName(metric.Name())
}
return base
}

func (p *Parser) parseField(value string) ([]telegraf.Metric, error) {
return p.Parser.Parse([]byte(value))
}

func init() {
processors.Add("parser", func() telegraf.Processor {
return &Parser{DropOriginal: false}
})
}
Loading