Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to parse JSON array (JSON Parser and HTTPJson) #1965

Merged
merged 1 commit into from
Nov 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ documentation for configuring journald. There is also a [`logfile` config option
available in 1.1, which will allow users to easily configure telegraf to
continue sending logs to /var/log/telegraf/telegraf.log.

- The JSON parser can now parse JSON data where the root object is an array.
The parsing configuration is applied to each element of the array.

### Features

- [#1726](https://github.com/influxdata/telegraf/issues/1726): Processor & Aggregator plugin support.
Expand Down
56 changes: 56 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,62 @@ Your Telegraf metrics would get tagged with "my_tag_1"
exec_mycollector,my_tag_1=foo a=5,b_c=6
```

If the JSON data is an array, then each element of the array is parsed with the configured settings.
Each resulting metric will be output with the same timestamp.

For example, if the following configuration:

```toml
[[inputs.exec]]
## Commands array
commands = ["/usr/bin/mycollector --foo=bar"]

## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"

## List of tag names to extract from top-level of JSON server response
tag_keys = [
"my_tag_1",
"my_tag_2"
]
```

with this JSON output from a command:

```json
[
{
"a": 5,
"b": {
"c": 6
},
"my_tag_1": "foo",
"my_tag_2": "baz"
},
{
"a": 7,
"b": {
"c": 8
},
"my_tag_1": "bar",
"my_tag_2": "baz"
}
]
```

Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"

```
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
```

# Value:

The "value" data format translates single values into Telegraf metrics. This
Expand Down
52 changes: 52 additions & 0 deletions plugins/inputs/httpjson/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ You can also specify which keys from server response should be considered tags:
]
```

If the JSON response is an array of objects, then each object will be parsed with the same configuration.

You can also specify additional request parameters for the service:

```
Expand Down Expand Up @@ -150,3 +152,53 @@ httpjson_mycollector1_b_e,server='http://my.service.com/_stats' value=5
httpjson_mycollector2_load,server='http://service.net/json/stats' value=100
httpjson_mycollector2_users,server='http://service.net/json/stats' value=1335
```

# Example 3, Multiple Metrics in Response:

The response JSON can be treated as an array of data points that are all parsed with the same configuration.

```
[[inputs.httpjson]]
name = "mycollector"
servers = [
"http://my.service.com/_stats"
]
# HTTP method to use (case-sensitive)
method = "GET"
tag_keys = ["service"]
```

which responds with the following JSON:

```json
[
{
"service": "service01",
"a": 0.5,
"b": {
"c": "some text",
"d": 0.1,
"e": 5
}
},
{
"service": "service02",
"a": 0.6,
"b": {
"c": "some text",
"d": 0.2,
"e": 6
}
}
]
```

The collected metrics will be:
```
httpjson_mycollector_a,service='service01',server='http://my.service.com/_stats' value=0.5
httpjson_mycollector_b_d,service='service01',server='http://my.service.com/_stats' value=0.1
httpjson_mycollector_b_e,service='service01',server='http://my.service.com/_stats' value=5
httpjson_mycollector_a,service='service02',server='http://my.service.com/_stats' value=0.6
httpjson_mycollector_b_d,service='service02',server='http://my.service.com/_stats' value=0.2
httpjson_mycollector_b_e,service='service02',server='http://my.service.com/_stats' value=6
```
49 changes: 49 additions & 0 deletions plugins/inputs/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,52 @@ func TestHttpJson200Tags(t *testing.T) {
}
}
}

const validJSONArrayTags = `
[
{
"value": 15,
"role": "master",
"build": "123"
},
{
"value": 17,
"role": "slave",
"build": "456"
}
]`

// Test that array data is collected correctly
func TestHttpJsonArray200Tags(t *testing.T) {
httpjson := genMockHttpJson(validJSONArrayTags, 200)

for _, service := range httpjson {
if service.Name == "other_webapp" {
var acc testutil.Accumulator
err := service.Gather(&acc)
// Set responsetime
for _, p := range acc.Metrics {
p.Fields["response_time"] = 1.0
}
require.NoError(t, err)
assert.Equal(t, 8, acc.NFields())
assert.Equal(t, uint64(4), acc.NMetrics())

for _, m := range acc.Metrics {
if m.Tags["role"] == "master" {
assert.Equal(t, "123", m.Tags["build"])
assert.Equal(t, float64(15), m.Fields["value"])
assert.Equal(t, float64(1), m.Fields["response_time"])
assert.Equal(t, "httpjson_"+service.Name, m.Measurement)
} else if m.Tags["role"] == "slave" {
assert.Equal(t, "456", m.Tags["build"])
assert.Equal(t, float64(17), m.Fields["value"])
assert.Equal(t, float64(1), m.Fields["response_time"])
assert.Equal(t, "httpjson_"+service.Name, m.Measurement)
} else {
assert.FailNow(t, "unknown metric")
}
}
}
}
}
41 changes: 37 additions & 4 deletions plugins/parsers/json/parser.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package json

import (
"bytes"
"encoding/json"
"fmt"
"strconv"
Expand All @@ -16,15 +17,22 @@ type JSONParser struct {
DefaultTags map[string]string
}

func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)

var jsonOut map[string]interface{}
var jsonOut []map[string]interface{}
err := json.Unmarshal(buf, &jsonOut)
if err != nil {
err = fmt.Errorf("unable to parse out as JSON, %s", err)
err = fmt.Errorf("unable to parse out as JSON Array, %s", err)
return nil, err
}
for _, item := range jsonOut {
metrics, err = p.parseObject(metrics, item)
}
return metrics, nil
}

func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) {

tags := make(map[string]string)
for k, v := range p.DefaultTags {
Expand All @@ -44,7 +52,7 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
}

f := JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
err := f.FlattenJSON("", jsonOut)
if err != nil {
return nil, err
}
Expand All @@ -57,6 +65,21 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return append(metrics, metric), nil
}

func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {

if !isarray(buf) {
metrics := make([]telegraf.Metric, 0)
var jsonOut map[string]interface{}
err := json.Unmarshal(buf, &jsonOut)
if err != nil {
err = fmt.Errorf("unable to parse out as JSON, %s", err)
return nil, err
}
return p.parseObject(metrics, jsonOut)
}
return p.parseArray(buf)
}

func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))

Expand Down Expand Up @@ -115,3 +138,13 @@ func (f *JSONFlattener) FlattenJSON(
}
return nil
}

func isarray(buf []byte) bool {
ia := bytes.IndexByte(buf, '[')
ib := bytes.IndexByte(buf, '{')
if ia > -1 && ia < ib {
return true
} else {
return false
}
}
Loading