Skip to content

Commit

Permalink
Add SerializeBatch method to the Serializer interface (influxdata#4107)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and arkady-emelyanov committed May 18, 2018
1 parent 66f8c14 commit f1a8b96
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 83 deletions.
129 changes: 67 additions & 62 deletions docs/DATA_FORMATS_OUTPUT.md
Original file line number Diff line number Diff line change
@@ -1,35 +1,15 @@
# Telegraf Output Data Formats
# Output Data Formats

Telegraf is able to serialize metrics into the following output data formats:
In addition to output specific data formats, Telegraf supports a set of
standard data formats that may be selected from when configuring many output
plugins.

1. [InfluxDB Line Protocol](#influx)
1. [JSON](#json)
1. [Graphite](#graphite)

Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/latest/concepts/glossary/#point),
are a combination of four basic parts:

1. Measurement Name
1. Tags
1. Fields
1. Timestamp

In InfluxDB line protocol, these 4 parts are easily defined in textual form:

```
measurement_name[,tag1=val1,...] field1=val1[,field2=val2,...] [timestamp]
```

For Telegraf outputs that write textual data (such as `kafka`, `mqtt`, and `file`),
InfluxDB line protocol was originally the only available output format. But now
we are normalizing telegraf metric "serializers" into a
[plugin-like interface](https://github.com/influxdata/telegraf/tree/master/plugins/serializers)
across all output plugins that can support it.
You will be able to identify a plugin that supports different data formats
by the presence of a `data_format`
config option, for example, in the `file` output plugin:

You will be able to identify the plugins with support by the presence of a
`data_format` config option, for example, in the `file` output plugin:
```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
Expand All @@ -40,22 +20,16 @@ config option, for example, in the `file` output plugin:
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"

## Additional configuration options go here
```

Each data_format has an additional set of configuration options available, which
I'll go over below.
## Influx

# Influx:

The `influx` format outputs data as
The `influx` data format outputs metrics using
[InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/).
This is the recommended format to use unless another format is required for
This is the recommended format unless another format is required for
interoperability.

### Influx Configuration:

### Influx Configuration
```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
Expand All @@ -82,7 +56,7 @@ interoperability.
# influx_uint_support = false
```

# Graphite:
## Graphite

The Graphite data format translates Telegraf metrics into _dot_ buckets. A
template can be specified for the output of Telegraf metrics into Graphite
Expand Down Expand Up @@ -115,7 +89,7 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
Fields with string values will be skipped. Boolean fields will be converted
to 1 (true) or 0 (false).

### Graphite Configuration:
### Graphite Configuration

```toml
[[outputs.file]]
Expand All @@ -134,27 +108,63 @@ to 1 (true) or 0 (false).
template = "host.tags.measurement.field"
```

# JSON:
## JSON

The JSON data format serialized Telegraf metrics in json format. The format is:
The JSON output data format output for a single metric is in the
form:
```json
{
"fields": {
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
},
"name": "docker",
"tags": {
"host": "raynor"
},
"timestamp": 1458229140
}
```

When an output plugin needs to emit multiple metrics at one time, it may use
the batch format. The use of batch format is determined by the plugin,
reference the documentation for the specific plugin.
```json
{
"fields":{
"field_1":30,
"field_2":4,
"field_N":59,
"n_images":660
},
"name":"docker",
"tags":{
"host":"raynor"
},
"timestamp":1458229140
"metrics": [
{
"fields": {
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
},
"name": "docker",
"tags": {
"host": "raynor"
},
"timestamp": 1458229140
},
{
"fields": {
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
},
"name": "docker",
"tags": {
"host": "raynor"
},
"timestamp": 1458229140
}
]
}
```

### JSON Configuration:
### JSON Configuration

```toml
[[outputs.file]]
Expand All @@ -166,14 +176,9 @@ The JSON data format serialized Telegraf metrics in json format. The format is:
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "json"
json_timestamp_units = "1ns"
```

By default, the timestamp that is output in JSON data format serialized Telegraf
metrics is in seconds. The precision of this timestamp can be adjusted for any output
by adding the optional `json_timestamp_units` parameter to the configuration for
that output. This parameter can be used to set the timestamp units to nanoseconds (`ns`),
microseconds (`us` or `µs`), milliseconds (`ms`), or seconds (`s`). Note that this
parameter will be truncated to the nearest power of 10 that, so if the `json_timestamp_units`
are set to `15ms` the timestamps for the JSON format serialized Telegraf metrics will be
output in hundredths of a second (`10ms`).
## The resolution to use for the metric timestamp. Must be a duration string
## such as "1ns", "1us", "1ms", "10ms", "1s". Durations are truncated to
## the power of 10 less than the specified units.
json_timestamp_units = "1s"
```
7 changes: 6 additions & 1 deletion plugins/outputs/influxdb/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,18 @@ func (d *MockDialer) DialContext(ctx context.Context, network string, address st
}

type MockSerializer struct {
SerializeF func(metric telegraf.Metric) ([]byte, error)
SerializeF func(metric telegraf.Metric) ([]byte, error)
SerializeBatchF func(metrics []telegraf.Metric) ([]byte, error)
}

func (s *MockSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.SerializeF(metric)
}

func (s *MockSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
return s.SerializeBatchF(metrics)
}

func TestUDP_NewUDPClientNoURL(t *testing.T) {
config := &influxdb.UDPConfig{}
_, err := influxdb.NewUDPClient(config)
Expand Down
16 changes: 16 additions & 0 deletions plugins/serializers/graphite/graphite.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package graphite

import (
"bytes"
"fmt"
"math"
"regexp"
Expand Down Expand Up @@ -60,6 +61,21 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return out, nil
}

func (s *GraphiteSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
for _, m := range metrics {
buf, err := s.Serialize(m)
if err != nil {
return nil, err
}
_, err = batch.Write(buf)
if err != nil {
return nil, err
}
}
return batch.Bytes(), nil
}

func formatValue(value interface{}) string {
switch v := value.(type) {
case string:
Expand Down
30 changes: 30 additions & 0 deletions plugins/serializers/graphite/graphite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)

Expand Down Expand Up @@ -577,3 +578,32 @@ func TestClean(t *testing.T) {
})
}
}

func TestSerializeBatch(t *testing.T) {
now := time.Unix(1234567890, 0)
tests := []struct {
name string
metric_name string
tags map[string]string
fields map[string]interface{}
expected string
}{
{
"Base metric",
"cpu",
map[string]string{"host": "localhost"},
map[string]interface{}{"usage_busy": float64(8.5)},
"localhost.cpu.usage_busy 8.5 1234567890\nlocalhost.cpu.usage_busy 8.5 1234567890\n",
},
}

s := GraphiteSerializer{}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m, err := metric.New(tt.metric_name, tt.tags, tt.fields, now)
assert.NoError(t, err)
actual, _ := s.SerializeBatch([]telegraf.Metric{m, m})
require.Equal(t, tt.expected, string(actual))
})
}
}
11 changes: 11 additions & 0 deletions plugins/serializers/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) {
return out, nil
}

func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
for _, m := range metrics {
_, err := s.Write(&batch, m)
if err != nil {
return nil, err
}
}
return batch.Bytes(), nil
}

func (s *Serializer) Write(w io.Writer, m telegraf.Metric) (int, error) {
err := s.writeMetric(w, m)
return s.bytesWritten, err
Expand Down
21 changes: 21 additions & 0 deletions plugins/serializers/influx/influx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,24 @@ func BenchmarkSerializer(b *testing.B) {
})
}
}

func TestSerialize_SerializeBatch(t *testing.T) {
m := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
)

metrics := []telegraf.Metric{m, m}

serializer := NewSerializer()
serializer.SetFieldSortOrder(SortFields)
output, err := serializer.SerializeBatch(metrics)
require.NoError(t, err)
require.Equal(t, []byte("cpu value=42 0\ncpu value=42 0\n"), output)
}
Loading

0 comments on commit f1a8b96

Please sign in to comment.