Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processors.parser): Add option to parse tags #11228

Merged
12 changes: 8 additions & 4 deletions plugins/processors/parser/README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
# Parser Processor Plugin

This plugin parses defined fields containing the specified data format and
creates new metrics based on the contents of the field.
This plugin parses defined fields or tags containing the specified data format
and creates new metrics based on the contents of the field or tag.

## Configuration

```toml @sample.conf
# Parse a value in a specified field/tag(s) and add the result in a new metric
# Parse a value in a specified field(s)/tag(s) and add the result in a new metric
[[processors.parser]]
## The name of the fields whose value will be parsed.
parse_fields = ["message"]

## The name of the tags whose value will be parsed.
# parse_tags = []

## If true, incoming metrics are not emitted.
drop_original = false
# drop_original = false

## If set to override, emitted metrics will be merged by overriding the
## original metric using the newly parsed metrics.
## Only has effect when drop_original is set to false.
merge = "override"

## The dataformat to be read from files
Expand Down
24 changes: 22 additions & 2 deletions plugins/processors/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Parser struct {
DropOriginal bool `toml:"drop_original"`
Merge string `toml:"merge"`
ParseFields []string `toml:"parse_fields"`
ParseTags []string `toml:"parse_tags"`
Log telegraf.Logger `toml:"-"`
parser telegraf.Parser
}
Expand Down Expand Up @@ -47,12 +48,13 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
newMetrics = append(newMetrics, metric)
}

// parse fields
for _, key := range p.ParseFields {
for _, field := range metric.FieldList() {
if field.Key == key {
switch value := field.Value.(type) {
case string:
fromFieldMetric, err := p.parseField(value)
fromFieldMetric, err := p.parseValue(value)
if err != nil {
p.Log.Errorf("could not parse field %s: %v", key, err)
}
Expand All @@ -74,6 +76,24 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
}
}

// parse tags
for _, key := range p.ParseTags {
if value, ok := metric.GetTag(key); ok {
fromTagMetric, err := p.parseValue(value)
if err != nil {
p.Log.Errorf("could not parse tag %s: %v", key, err)
}

for _, m := range fromTagMetric {
if m.Name() == "" {
m.SetName(metric.Name())
}
}

newMetrics = append(newMetrics, fromTagMetric...)
}
}

if len(newMetrics) == 0 {
continue
}
Expand All @@ -100,7 +120,7 @@ func merge(base telegraf.Metric, metrics []telegraf.Metric) telegraf.Metric {
return base
}

func (p *Parser) parseField(value string) ([]telegraf.Metric, error) {
func (p *Parser) parseValue(value string) ([]telegraf.Metric, error) {
return p.parser.Parse([]byte(value))
}

Expand Down
105 changes: 91 additions & 14 deletions plugins/processors/parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,13 @@ import (
_ "github.com/influxdata/telegraf/plugins/parsers/all"

"github.com/influxdata/telegraf/testutil"

"github.com/stretchr/testify/require"
)

// compares metrics without comparing time
func compareMetrics(t *testing.T, expected, actual []telegraf.Metric) {
require.Equal(t, len(expected), len(actual))
for i, m := range actual {
require.Equal(t, expected[i].Name(), m.Name())
require.Equal(t, expected[i].Fields(), m.Fields())
require.Equal(t, expected[i].Tags(), m.Tags())
}
}

func TestApply(t *testing.T) {
tests := []struct {
name string
parseFields []string
parseTags []string
config parsers.Config
dropOriginal bool
merge string
Expand Down Expand Up @@ -361,6 +350,93 @@ func TestApply(t *testing.T) {
time.Unix(0, 0)),
},
},
{
name: "parse one tag drop original",
parseTags: []string{"sample"},
dropOriginal: true,
config: parsers.Config{
DataFormat: "logfmt",
},
input: metric.New(
"singleTag",
map[string]string{
"some": "tag",
"sample": `ts=2018-07-24T19:43:40.275Z`,
},
map[string]interface{}{},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"singleTag",
map[string]string{},
map[string]interface{}{
"ts": "2018-07-24T19:43:40.275Z",
},
time.Unix(0, 0)),
},
},
{
name: "parse one tag with merge",
parseTags: []string{"sample"},
dropOriginal: false,
merge: "override",
config: parsers.Config{
DataFormat: "logfmt",
},
input: metric.New(
"singleTag",
map[string]string{
"some": "tag",
"sample": `ts=2018-07-24T19:43:40.275Z`,
},
map[string]interface{}{},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"singleTag",
map[string]string{
"some": "tag",
"sample": `ts=2018-07-24T19:43:40.275Z`,
},
map[string]interface{}{
"ts": "2018-07-24T19:43:40.275Z",
},
time.Unix(0, 0)),
},
},
{
name: "parse one tag keep",
parseTags: []string{"sample"},
dropOriginal: false,
config: parsers.Config{
DataFormat: "logfmt",
},
input: metric.New(
"singleTag",
map[string]string{
"some": "tag",
"sample": `ts=2018-07-24T19:43:40.275Z`,
},
map[string]interface{}{},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"singleTag",
map[string]string{
"some": "tag",
"sample": `ts=2018-07-24T19:43:40.275Z`,
},
map[string]interface{}{},
time.Unix(0, 0)),
metric.New(
"singleTag",
map[string]string{},
map[string]interface{}{
"ts": "2018-07-24T19:43:40.275Z",
},
time.Unix(0, 0)),
},
},
{
name: "Fail to parse one field but parses other [keep]",
parseFields: []string{"good", "bad"},
Expand Down Expand Up @@ -506,14 +582,15 @@ func TestApply(t *testing.T) {
parser := Parser{
Config: tt.config,
ParseFields: tt.parseFields,
ParseTags: tt.parseTags,
DropOriginal: tt.dropOriginal,
Merge: tt.merge,
Log: testutil.Logger{Name: "processor.parser"},
}

output := parser.Apply(tt.input)
t.Logf("Testing: %s", tt.name)
compareMetrics(t, tt.expected, output)
testutil.RequireMetricsEqual(t, tt.expected, output, testutil.IgnoreTime())
})
}
}
Expand Down Expand Up @@ -584,7 +661,7 @@ func TestBadApply(t *testing.T) {

output := parser.Apply(tt.input)

compareMetrics(t, output, tt.expected)
testutil.RequireMetricsEqual(t, tt.expected, output, testutil.IgnoreTime())
})
}
}
Expand Down
8 changes: 6 additions & 2 deletions plugins/processors/parser/sample.conf
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
# Parse a value in a specified field/tag(s) and add the result in a new metric
# Parse a value in a specified field(s)/tag(s) and add the result in a new metric
[[processors.parser]]
## The name of the fields whose value will be parsed.
parse_fields = ["message"]

## The name of the tags whose value will be parsed.
# parse_tags = []

## If true, incoming metrics are not emitted.
drop_original = false
# drop_original = false

## If set to override, emitted metrics will be merged by overriding the
## original metric using the newly parsed metrics.
## Only has effect when drop_original is set to false.
merge = "override"

## The dataformat to be read from files
Expand Down