Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new binary data parser #7030

Closed
wants to merge 64 commits into from
Closed
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
1a64cbe
Initial binary metric implementation
yolkhovyy Oct 13, 2019
2c1b466
Metric time
yolkhovyy Oct 13, 2019
72591fc
Metric time
yolkhovyy Oct 13, 2019
b76cea3
Added string type
yolkhovyy Oct 16, 2019
746e956
Re-factored and tests
yolkhovyy Oct 17, 2019
1365ce8
More tests
yolkhovyy Oct 17, 2019
bc81a4b
More tests
yolkhovyy Oct 18, 2019
7a511f2
Renamed to BinData
yolkhovyy Oct 18, 2019
5421933
Working bindata parser
yolkhovyy Oct 25, 2019
87ca271
Merge branch 'feature/fix_record_parser' into develop
yolkhovyy Oct 26, 2019
eda274e
Optional field size
yolkhovyy Nov 3, 2019
6f944e9
Added bindata parser description in README.md
yolkhovyy Nov 11, 2019
adfda86
README.md updated
yolkhovyy Dec 14, 2019
3e85c0b
Removed Protocol
yolkhovyy Dec 22, 2019
ec20ac2
Fixed typo
yolkhovyy Dec 22, 2019
c300d4f
Remove unused const
yolkhovyy Feb 2, 2020
7a79c8e
Merge branch 'master' into develop
yolkhovyy Feb 2, 2020
746ea4d
Updated README.md
yolkhovyy Feb 2, 2020
72f8137
Reworking string encoding
yolkhovyy Feb 5, 2020
4a1c812
Unit tests for string encoding
yolkhovyy Feb 5, 2020
7e9003c
Merge branch 'feature/new-string-encoding' into develop
yolkhovyy Feb 5, 2020
7b065d7
Padding test cleaned up
yolkhovyy Feb 6, 2020
e32d8a1
UTF-8 unit test
yolkhovyy Feb 6, 2020
ef749af
Unit tests cleaned up
yolkhovyy Feb 8, 2020
eb77521
Merge branch 'feature/utf8-encoding-test' into develop
yolkhovyy Feb 8, 2020
0a30031
Comments and commented out code
yolkhovyy Feb 8, 2020
c49fc04
README.md updated
yolkhovyy Feb 8, 2020
54a0cc8
../registry.go
yolkhovyy Feb 8, 2020
1ad2998
Added bindata factory, reworked unit tests
yolkhovyy Feb 8, 2020
b040ed3
Comments, etc
yolkhovyy Feb 8, 2020
7bdb452
Added unit test for default tags
yolkhovyy Feb 8, 2020
730f08d
Merge branch 'feature/bindata-factory' into develop
yolkhovyy Feb 8, 2020
5a2aee4
README.md updated
yolkhovyy Feb 8, 2020
e7c23c7
Duplicate field names check
yolkhovyy Feb 15, 2020
5b380cd
Merge branch 'develop'
yolkhovyy Feb 15, 2020
094174c
Removed bindata fields in config.go
yolkhovyy Feb 15, 2020
ccd4320
Merge branch 'master' into develop
yolkhovyy Feb 15, 2020
09622b4
Field removal
yolkhovyy Feb 15, 2020
4d3d355
Removed temporary Dockerfile
yolkhovyy Feb 15, 2020
984d7c6
Removed .vscode, reverted README.md
yolkhovyy Feb 16, 2020
2f222d5
Removed unnecassary change
yolkhovyy Feb 16, 2020
3fc716c
Formatting in bindata/paresr_test.go
yolkhovyy Feb 16, 2020
415e3cd
Merge branch 'master' of https://github.com/influxdata/telegraf
yolkhovyy Jun 20, 2020
499e9c3
Review remarks
yolkhovyy Dec 16, 2021
a31907b
Update plugins/parsers/bindata/README.md
yolkhovyy Jan 9, 2022
715a5d1
Update plugins/parsers/bindata/parser.go
yolkhovyy Jan 9, 2022
52e27b8
Update plugins/parsers/bindata/parser.go
yolkhovyy Jan 9, 2022
2abe97d
Update plugins/parsers/bindata/parser.go
yolkhovyy Jan 9, 2022
7fc1ba1
Removed new line
yolkhovyy Jan 9, 2022
f71923f
Removed new line
yolkhovyy Jan 9, 2022
f0c98e1
Update plugins/parsers/bindata/parser.go
yolkhovyy Jan 9, 2022
275858d
Update plugins/parsers/bindata/parser_test.go
yolkhovyy Jan 9, 2022
313c538
Update plugins/parsers/bindata/parser_test.go
yolkhovyy Jan 9, 2022
6be9cc2
Fixed unit test
yolkhovyy Jan 9, 2022
52a45a6
Removed new lines in tests
yolkhovyy Jan 9, 2022
016372b
Default endiannes
yolkhovyy Jan 9, 2022
013c539
Resolved merge conflicts
yolkhovyy Jan 23, 2022
d110b54
Compilation error fixe
yolkhovyy Jan 23, 2022
9ab9d4f
Reworked getParserConfig in config/config.go
yolkhovyy Jan 23, 2022
a8185c4
Lint errors fixed
yolkhovyy Jan 23, 2022
d127f83
Lint errors fixed
yolkhovyy Jan 23, 2022
0b154c2
Lint errors fixed
yolkhovyy Jan 23, 2022
9a93ace
Review comments addressed
yolkhovyy Jan 23, 2022
e5ad2d1
Review comments addressed
yolkhovyy Jan 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/bindata"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/toml"
Expand Down Expand Up @@ -1492,6 +1493,61 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
}
}

if node, ok := tbl.Fields["bindata_endiannes"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.BinDataEndiannes = str.Value
}
}
}

if node, ok := tbl.Fields["bindata_time_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.BinDataTimeFormat = str.Value
}
}
}

if node, ok := tbl.Fields["bindata_string_encoding"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.BinDataStringEncoding = str.Value
}
}
}

if node, ok := tbl.Fields["bindata_fields"]; ok {
if bindataFields, ok := node.([]*ast.Table); ok {
for _, bindataField := range bindataFields {
var field bindata.Field
for _, prop := range bindataField.Fields {
if kv, ok := prop.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
switch kv.Key {
case "name":
field.Name = str.Value
case "type":
field.Type = str.Value
default:
}
} else if integer, ok := kv.Value.(*ast.Integer); ok {
v, err := strconv.ParseUint(integer.Value, 10, 32)
if err == nil {
switch kv.Key {
case "size":
field.Size = uint(v)
default:
}
}
}
}
}
c.BinDataFields = append(c.BinDataFields, field)
}
}
}

if node, ok := tbl.Fields["json_string_fields"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
Expand Down Expand Up @@ -1854,6 +1910,10 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "bindata_endiannes")
delete(tbl.Fields, "bindata_string_encoding")
delete(tbl.Fields, "bindata_time_format")
delete(tbl.Fields, "bindata_fields")
delete(tbl.Fields, "json_name_key")
delete(tbl.Fields, "json_query")
delete(tbl.Fields, "json_string_fields")
Expand Down
55 changes: 55 additions & 0 deletions plugins/parsers/bindata/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# BinData

The "BinData" parser translates binary records consisting of multiple fields into Telegraf metrics. It supports:

- Little- and Big-Endian encoding
- bool, int8/uint8, int16/uint16, int32/uint32, int64/uint64, float32/float64 field types
- UTF-8 and ASCII-encoded strings
- unix, unix_ms, unix_us and unix_ns timestamp

### Configuration

```toml
[[inputs.mqtt_consumer]]
name_override = "drone_status"

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "bindata"

## Numeric fields endiannes, "be" or "le", default "be"
bindata_endiannes = "be"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding machine and make it the default? That is, by default use the endianess of the machine that we have to detect.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, can be done that way.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, what if the measurement arrives from a different machine which has different than the endianness of the machine where Telegraf runs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you have to specify the endianness (which can be done). However, it is not uncommon to have machines with the same endianness, so a sensible default should be chosen IMO.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"be" is the default, and I commented the line out in README.md but can't see the update here.


## Timestamp format - "unix", "unix_ms", "unix_us", "unix_ns", default "unix"
bindata_time_format = "unix"
yolkhovyy marked this conversation as resolved.
Show resolved Hide resolved

## String encoding, default "UTF-8"
bindata_string_encoding = "UTF-8"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please comment default values.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, please also comment the present line in the sample-config...

Suggested change
bindata_string_encoding = "UTF-8"
# bindata_string_encoding = "UTF-8"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's save to assume UTF-8 as a default encoding isn't it? If so, please comment this line as shown above.


## Binary data descriptor
## Fields are described by:
## - name - field name, use Golang naming conventions
yolkhovyy marked this conversation as resolved.
Show resolved Hide resolved
## - type - field type, use Golang type names
srebhan marked this conversation as resolved.
Show resolved Hide resolved
## - size - size in bytes, obligatory for fields with type "string" and "padding", ignored in numeric and bool fields
## Field with case incensitive name "time" has special meaning - it is used as metric time and must be of type
## - int32 for bindata_time_format = "unix", or of type
## - int64 for bindata_time_format = "unix_ms", "unix_us", "unix_ns".
## Use padding when auto-generated metric time is preferred.
bindata_fields = [
{name="Version",type="uint16"},
{name="Time",type="int32"},
{name="Latitude",type="float64"},
{name="Longitude",type="float64"},
{name="Altitude",type="float32"},
{name="Heading",type="float32"},
{name="Elevation",type="float32"},
{name="Bank",type="float32"},
{name="GroundSpeed",type="float32"},
{name="AirSpeed",type="float32"},
{name="None",type="padding", size=16},
{name="Status",type="string",size=7},
{name="StatusOK",type="bool"},
srebhan marked this conversation as resolved.
Show resolved Hide resolved
]
```
216 changes: 216 additions & 0 deletions plugins/parsers/bindata/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package bindata

import (
"bytes"
"encoding/binary"
"fmt"
"reflect"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
)

const timeKey = "time"
const timezone = "UTC"
const defaultStringEncoding = "UTF-8"
const defaultTimeFormat = "unix"

// Field is a binary data field descriptor
type Field struct {
Name string
Type string
Size uint
}

// BinData is a binary data parser
type BinData struct {
metricName string
timeFormat string
endiannes string
byteOrder binary.ByteOrder
stringEncoding string
fields []Field
Comment on lines +30 to +35
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please export these fields. Please also add toml-tags to those options to allow for the new parser format (see PR #8791 and the CSV parser as an example).

DefaultTags map[string]string
}

// Supported field types
var fieldTypes = map[string]reflect.Type{
"bool": reflect.TypeOf((*bool)(nil)).Elem(),
"uint8": reflect.TypeOf((*uint8)(nil)).Elem(),
"int8": reflect.TypeOf((*int8)(nil)).Elem(),
"uint16": reflect.TypeOf((*uint16)(nil)).Elem(),
"int16": reflect.TypeOf((*int16)(nil)).Elem(),
"uint32": reflect.TypeOf((*uint32)(nil)).Elem(),
"int32": reflect.TypeOf((*int32)(nil)).Elem(),
"uint64": reflect.TypeOf((*uint64)(nil)).Elem(),
"int64": reflect.TypeOf((*int64)(nil)).Elem(),
"float32": reflect.TypeOf((*float32)(nil)).Elem(),
"float64": reflect.TypeOf((*float64)(nil)).Elem(),
"string": reflect.TypeOf((*string)(nil)).Elem(),
"padding": reflect.TypeOf((*[]byte)(nil)).Elem(),
}

// NewBinDataParser is BinData factory
func NewBinDataParser(
metricName string,
timeFormat string,
endiannes string,
stringEncoding string,
fields []Field,
defaultTags map[string]string,
) (*BinData, error) {

// Time format
switch timeFormat {
case "":
timeFormat = defaultTimeFormat
case "unix", "unix_ms", "unix_us", "unix_ns":
default:
return nil, fmt.Errorf("invalid time format %s", timeFormat)
}
yolkhovyy marked this conversation as resolved.
Show resolved Hide resolved

// Endiannes
var byteOrder binary.ByteOrder
endiannes = strings.ToLower(endiannes)
switch endiannes {
case "", "be":
byteOrder = binary.BigEndian
case "le":
byteOrder = binary.LittleEndian
default:
return nil, fmt.Errorf("invalid bindata_endiannes %s", endiannes)
yolkhovyy marked this conversation as resolved.
Show resolved Hide resolved
}

// String encoding
if stringEncoding == "" {
stringEncoding = defaultStringEncoding
}
stringEncoding = strings.ToUpper(stringEncoding)
if stringEncoding != defaultStringEncoding {
return nil, fmt.Errorf(`invalid string encoding %s`, stringEncoding)
}

// Field types, names and sizes
knownFields := make(map[string]bool)
for i := 0; i < len(fields); i++ {
fieldType, ok := fieldTypes[strings.ToLower(fields[i].Type)]
yolkhovyy marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
return nil, fmt.Errorf(`invalid field type %s`, fields[i].Type)
}

if fields[i].Type != "padding" {
// Check for duplicate field names
fieldName := fields[i].Name
if _, ok := knownFields[fieldName]; ok {
return nil, fmt.Errorf(`duplicate field name %s`, fieldName)
}
knownFields[fieldName] = true

// Time field type check
if fieldName == "time" {
if timeFormat == "unix" &&
fieldType != reflect.TypeOf((*int32)(nil)).Elem() {
return nil, fmt.Errorf(`invalid time type, must be int32`)
}
if (timeFormat == "unix_ms" || timeFormat == "unix_us" || timeFormat == "unix_ns") &&
fieldType != reflect.TypeOf((*int64)(nil)).Elem() {
return nil, fmt.Errorf(`invalid time type, must be int64`)
}
}

// Overwrite non-string and non-padding field size
if fieldType.Name() != "string" {
fields[i].Size = uint(fieldType.Size())
}
}
yolkhovyy marked this conversation as resolved.
Show resolved Hide resolved
}

return &BinData{
metricName: metricName,
timeFormat: timeFormat,
endiannes: endiannes,
byteOrder: byteOrder,
stringEncoding: stringEncoding,
fields: fields,
DefaultTags: defaultTags,
}, nil
}

// SetDefaultTags implements Parser.SetDefaultTags()
func (binData *BinData) SetDefaultTags(tags map[string]string) {
binData.DefaultTags = tags
}

// Parse implements Parser.Parse()
func (binData *BinData) Parse(data []byte) ([]telegraf.Metric, error) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this newline as it will not help in structuring the code.

fields := make(map[string]interface{})
var offset uint = 0

for _, field := range binData.fields {
yolkhovyy marked this conversation as resolved.
Show resolved Hide resolved
if offset > uint(len(data)) || offset+field.Size > uint(len(data)) {
return nil, fmt.Errorf("invalid offset/size in field %s", field.Name)
}
if field.Type != "padding" {
fieldBuffer := data[offset : offset+field.Size]
switch field.Type {
case "string":
fields[field.Name] = string(fieldBuffer)
srebhan marked this conversation as resolved.
Show resolved Hide resolved
default:
fieldValue := reflect.New(fieldTypes[field.Type])
byteReader := bytes.NewReader(fieldBuffer)
binary.Read(byteReader, binData.byteOrder, fieldValue.Interface())
fields[field.Name] = fieldValue.Elem().Interface()
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I'd like to see something like

Suggested change
if field.Type != "padding" {
fieldBuffer := data[offset : offset+field.Size]
switch field.Type {
case "string":
fields[field.Name] = string(fieldBuffer)
default:
fieldValue := reflect.New(fieldTypes[field.Type])
byteReader := bytes.NewReader(fieldBuffer)
binary.Read(byteReader, binData.byteOrder, fieldValue.Interface())
fields[field.Name] = fieldValue.Elem().Interface()
}
}
switch field.Type {
case "padding":
continue
case "bool":
var v bool
r := bytes.NewReader(data[offset : offset+1])
if err := binary.Read(r, binData.byteOrder, &v); err != nil {
return nil, err
}
fields[field.Name] = v
case "uint8":
var v uint8
r := bytes.NewReader(data[offset : offset+1])
if err := binary.Read(r, binData.byteOrder, &v); err != nil {
return nil, err
}
fields[field.Name] = v
case "int8":
...
case "uint16":
...
case "int16":
...
case "uint32":
...
case "int32":
...
case "uint64":
...
case "int64":
...
case "float32":
...
case "float64":
...
case "string":
fields[field.Name] = string(data[offset:offset+field.Size)
}

Copy link
Author

@yolkhovyy yolkhovyy Jan 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree with padding inside the switch. Regarding reflection - I thought it was cool :) is reflection against IndluxData style/rules? It is compact and solves endianess nicely.
What bothers me here is the explicit size of the string field. I would go for null-terminated strings here - because it's pretty much standard in embedded - usually coded in c/c++.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it's probably a matter of taste, but I think the switch/case above is much more readable/understandable compared to reflection. There also might be a performance impact, but that's not my primary concern... So please switch to the switch-statement. :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the null-terminated strings. How about, if a length is given, we respect this length, otherwise we go for null-termination. This would allow to read non-null-terminated strings (i.e. fixed length fields) which you sometimes see in embedded devices.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null-terminated strings - sounds good, good point

offset += field.Size
}

metricTime, err := binData.getTime(fields)
if err != nil {
return nil, err
}

metric, err := metric.New(binData.metricName, binData.DefaultTags, fields, metricTime)
if err != nil {
return nil, err
}

return []telegraf.Metric{metric}, err
}

// ParseLine implements Parser.ParseLine()
func (binData *BinData) ParseLine(line string) (telegraf.Metric, error) {
return nil, fmt.Errorf("BinData.ParseLine() not supported")
}
yolkhovyy marked this conversation as resolved.
Show resolved Hide resolved

func (binData *BinData) getTime(fields map[string]interface{}) (time.Time, error) {
nilTime := new(time.Time)
metricTime := time.Now()
timeValue := fields[timeKey]
if timeValue != nil {
var err error
switch binData.timeFormat {
case "unix":
if _, ok := timeValue.(int32); !ok {
return *nilTime, fmt.Errorf("invalid time type, must be int32")
}
metricTime, err = internal.ParseTimestamp(binData.timeFormat, int64(timeValue.(int32)), timezone)
case "unix_ms", "unix_us", "unix_ns":
if _, ok := timeValue.(int64); !ok {
return *nilTime, fmt.Errorf("invalid time type, must be int64")
}
metricTime, err = internal.ParseTimestamp(binData.timeFormat, int64(timeValue.(int64)), timezone)
default:
return *nilTime, fmt.Errorf("invalid time format %s", binData.timeFormat)
}
if err != nil {
return *nilTime, err
}
delete(fields, timeKey)
}
return metricTime, nil
yolkhovyy marked this conversation as resolved.
Show resolved Hide resolved
}
Loading