-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add new binary data parser #7030
Changes from 1 commit
1a64cbe
2c1b466
72591fc
b76cea3
746e956
1365ce8
bc81a4b
7a511f2
5421933
87ca271
eda274e
6f944e9
adfda86
3e85c0b
ec20ac2
c300d4f
7a79c8e
746ea4d
72f8137
4a1c812
7e9003c
7b065d7
e32d8a1
ef749af
eb77521
0a30031
c49fc04
54a0cc8
1ad2998
b040ed3
7bdb452
730f08d
5a2aee4
e7c23c7
5b380cd
094174c
ccd4320
09622b4
4d3d355
984d7c6
2f222d5
3fc716c
415e3cd
499e9c3
a31907b
715a5d1
52e27b8
2abe97d
7fc1ba1
f71923f
f0c98e1
275858d
313c538
6be9cc2
52a45a6
016372b
013c539
d110b54
9ab9d4f
a8185c4
d127f83
0b154c2
9a93ace
e5ad2d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -2,10 +2,10 @@ | |||||
|
||||||
The "BinData" parser translates binary records consisting of multiple fields into Telegraf metrics. It supports: | ||||||
|
||||||
* Little- and Big-Endian encoding | ||||||
* bool, int8/uint8, int16/uint16, int32/uint32, int64/uint64, float32/float64 field types | ||||||
* UTF-8 and ASCII-encoded strings | ||||||
* unix, unix_ms, unix_us and unix_ns timestamp | ||||||
- Little- and Big-Endian encoding | ||||||
- bool, int8/uint8, int16/uint16, int32/uint32, int64/uint64, float32/float64 field types | ||||||
- UTF-8 and ASCII-encoded strings | ||||||
- unix, unix_ms, unix_us and unix_ns timestamp | ||||||
|
||||||
### Configuration | ||||||
|
||||||
|
@@ -19,20 +19,24 @@ The "BinData" parser translates binary records consisting of multiple fields int | |||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | ||||||
data_format = "bindata" | ||||||
|
||||||
## Numeric fields endiannes, "be" or "le" | ||||||
## Numeric fields endiannes, "be" or "le", default "be" | ||||||
bindata_endiannes = "be" | ||||||
|
||||||
## Timestamp format - "unix", "unix_ms", "unix_us", "unix_ns" | ||||||
## Timestamp format - "unix", "unix_ms", "unix_us", "unix_ns", default "unix" | ||||||
bindata_time_format = "unix" | ||||||
yolkhovyy marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
## String encoding - "UTF-8" is the default | ||||||
## String encoding, default "UTF-8" | ||||||
bindata_string_encoding = "UTF-8" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please comment default values. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If so, please also comment the present line in the sample-config...
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's save to assume UTF-8 as a default encoding isn't it? If so, please comment this line as shown above. |
||||||
|
||||||
## Binary data descriptor | ||||||
## Fields are described by: | ||||||
## - name - field name, use Golang naming conventions | ||||||
yolkhovyy marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
## - type - field type, use Golang type names | ||||||
srebhan marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
## - size - size in bytes, obligatory for fields with type "string" and "padding", ignored in numeric and bool fields | ||||||
## Field with case incensitive name "time" has special meaning - it is used as metric time and must be of type | ||||||
## - int32 for bindata_time_format = "unix", or of type | ||||||
## - int64 for bindata_time_format = "unix_ms", "unix_us", "unix_ns". | ||||||
## Use padding when auto-generated metric time is preferred. | ||||||
bindata_fields = [ | ||||||
{name="Version",type="uint16"}, | ||||||
{name="Time",type="int32"}, | ||||||
|
@@ -46,5 +50,6 @@ The "BinData" parser translates binary records consisting of multiple fields int | |||||
{name="AirSpeed",type="float32"}, | ||||||
{name="None",type="padding", size=16}, | ||||||
{name="Status",type="string",size=7}, | ||||||
{name="StatusOK",type="bool"}, | ||||||
] | ||||||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,6 @@ import ( | |
"bytes" | ||
"encoding/binary" | ||
"fmt" | ||
"io" | ||
"reflect" | ||
"strings" | ||
"time" | ||
|
@@ -17,6 +16,7 @@ import ( | |
const timeKey = "time" | ||
const timezone = "UTC" | ||
const defaultStringEncoding = "UTF-8" | ||
const defaultTimeFormat = "unix" | ||
|
||
// Field is a binary data field descriptor | ||
type Field struct { | ||
|
@@ -27,15 +27,32 @@ type Field struct { | |
|
||
// BinData is a binary data parser | ||
type BinData struct { | ||
MetricName string | ||
TimeFormat string | ||
Endiannes string | ||
metricName string | ||
timeFormat string | ||
endiannes string | ||
byteOrder binary.ByteOrder | ||
StringEncoding string | ||
Fields []Field | ||
stringEncoding string | ||
fields []Field | ||
DefaultTags map[string]string | ||
} | ||
|
||
// Supported field types | ||
var fieldTypes = map[string]reflect.Type{ | ||
"bool": reflect.TypeOf((*bool)(nil)).Elem(), | ||
"uint8": reflect.TypeOf((*uint8)(nil)).Elem(), | ||
"int8": reflect.TypeOf((*int8)(nil)).Elem(), | ||
"uint16": reflect.TypeOf((*uint16)(nil)).Elem(), | ||
"int16": reflect.TypeOf((*int16)(nil)).Elem(), | ||
"uint32": reflect.TypeOf((*uint32)(nil)).Elem(), | ||
"int32": reflect.TypeOf((*int32)(nil)).Elem(), | ||
"uint64": reflect.TypeOf((*uint64)(nil)).Elem(), | ||
"int64": reflect.TypeOf((*int64)(nil)).Elem(), | ||
"float32": reflect.TypeOf((*float32)(nil)).Elem(), | ||
"float64": reflect.TypeOf((*float64)(nil)).Elem(), | ||
"string": reflect.TypeOf((*string)(nil)).Elem(), | ||
"padding": reflect.TypeOf((*[]byte)(nil)).Elem(), | ||
} | ||
|
||
// NewBinDataParser is BinData factory | ||
func NewBinDataParser( | ||
metricName string, | ||
|
@@ -48,6 +65,8 @@ func NewBinDataParser( | |
|
||
// Time format | ||
switch timeFormat { | ||
case "": | ||
timeFormat = defaultTimeFormat | ||
case "unix", "unix_ms", "unix_us", "unix_ns": | ||
default: | ||
return nil, fmt.Errorf("invalid time format %s", timeFormat) | ||
|
@@ -75,18 +94,30 @@ func NewBinDataParser( | |
} | ||
|
||
// Field types, names and sizes | ||
knownFields := make(map[string]bool) | ||
for i := 0; i < len(fields); i++ { | ||
fieldType, ok := fieldTypes[fields[i].Type] | ||
fieldType, ok := fieldTypes[strings.ToLower(fields[i].Type)] | ||
if !ok { | ||
return nil, fmt.Errorf(`invalid field type %s`, fields[i].Type) | ||
} | ||
|
||
if fields[i].Type != "padding" { | ||
// Check for duplicate field names | ||
fieldName := fields[i].Name | ||
for j := i + 1; j < len(fields); j++ { | ||
if fieldName == fields[j].Name { | ||
return nil, fmt.Errorf(`duplicate field name %s`, fieldName) | ||
if _, ok := knownFields[fieldName]; ok { | ||
return nil, fmt.Errorf(`duplicate field name %s`, fieldName) | ||
} | ||
knownFields[fieldName] = true | ||
|
||
// Time field type check | ||
if fieldName == "time" { | ||
if timeFormat == "unix" && | ||
fieldType != reflect.TypeOf((*int32)(nil)).Elem() { | ||
return nil, fmt.Errorf(`invalid time type, must be int32`) | ||
} | ||
if (timeFormat == "unix_ms" || timeFormat == "unix_us" || timeFormat == "unix_ns") && | ||
fieldType != reflect.TypeOf((*int64)(nil)).Elem() { | ||
return nil, fmt.Errorf(`invalid time type, must be int64`) | ||
} | ||
} | ||
|
||
|
@@ -98,12 +129,12 @@ func NewBinDataParser( | |
} | ||
|
||
return &BinData{ | ||
MetricName: metricName, | ||
TimeFormat: timeFormat, | ||
Endiannes: endiannes, | ||
metricName: metricName, | ||
timeFormat: timeFormat, | ||
endiannes: endiannes, | ||
byteOrder: byteOrder, | ||
StringEncoding: stringEncoding, | ||
Fields: fields, | ||
stringEncoding: stringEncoding, | ||
fields: fields, | ||
DefaultTags: defaultTags, | ||
}, nil | ||
} | ||
|
@@ -117,42 +148,33 @@ func (binData *BinData) SetDefaultTags(tags map[string]string) { | |
func (binData *BinData) Parse(data []byte) ([]telegraf.Metric, error) { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove this newline as it will not help in structuring the code. |
||
fields := make(map[string]interface{}) | ||
reader := io.NewSectionReader(bytes.NewReader(data), 0, int64(len(data))) | ||
var offset int64 = 0 | ||
var offset uint = 0 | ||
|
||
for _, field := range binData.Fields { | ||
for _, field := range binData.fields { | ||
if offset > uint(len(data)) || offset+field.Size > uint(len(data)) { | ||
return nil, fmt.Errorf("invalid offset/size in field %s", field.Name) | ||
} | ||
if field.Type != "padding" { | ||
fieldBuffer := make([]byte, field.Size) | ||
|
||
if _, err := reader.ReadAt(fieldBuffer, offset); err != nil { | ||
return nil, err | ||
} | ||
|
||
fieldType, ok := fieldTypes[strings.ToLower(field.Type)] | ||
if !ok { | ||
return nil, fmt.Errorf(`invalid field type %s`, field.Type) | ||
} | ||
|
||
switch fieldType.Name() { | ||
fieldBuffer := data[offset : offset+field.Size] | ||
switch field.Type { | ||
case "string": | ||
fields[field.Name] = string(fieldBuffer) | ||
srebhan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
default: | ||
fieldValue := reflect.New(fieldType) | ||
fieldValue := reflect.New(fieldTypes[field.Type]) | ||
byteReader := bytes.NewReader(fieldBuffer) | ||
binary.Read(byteReader, binData.byteOrder, fieldValue.Interface()) | ||
fields[field.Name] = fieldValue.Elem().Interface() | ||
} | ||
} | ||
offset += int64(field.Size) | ||
offset += field.Size | ||
} | ||
|
||
metricTime, err := binData.getTime(fields) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
metric, err := metric.New(binData.MetricName, binData.DefaultTags, | ||
fields, metricTime) | ||
metric, err := metric.New(binData.metricName, binData.DefaultTags, fields, metricTime) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -165,36 +187,25 @@ func (binData *BinData) ParseLine(line string) (telegraf.Metric, error) { | |
return nil, fmt.Errorf("BinData.ParseLine() not supported") | ||
} | ||
|
||
// Supported field types | ||
var fieldTypes = map[string]reflect.Type{ | ||
"bool": reflect.TypeOf((*bool)(nil)).Elem(), | ||
"uint8": reflect.TypeOf((*uint8)(nil)).Elem(), | ||
"int8": reflect.TypeOf((*int8)(nil)).Elem(), | ||
"uint16": reflect.TypeOf((*uint16)(nil)).Elem(), | ||
"int16": reflect.TypeOf((*int16)(nil)).Elem(), | ||
"uint32": reflect.TypeOf((*uint32)(nil)).Elem(), | ||
"int32": reflect.TypeOf((*int32)(nil)).Elem(), | ||
"uint64": reflect.TypeOf((*uint64)(nil)).Elem(), | ||
"int64": reflect.TypeOf((*int64)(nil)).Elem(), | ||
"float32": reflect.TypeOf((*float32)(nil)).Elem(), | ||
"float64": reflect.TypeOf((*float64)(nil)).Elem(), | ||
"string": reflect.TypeOf((*string)(nil)).Elem(), | ||
"padding": reflect.TypeOf((*[]byte)(nil)).Elem(), | ||
} | ||
|
||
func (binData *BinData) getTime(fields map[string]interface{}) (time.Time, error) { | ||
nilTime := new(time.Time) | ||
metricTime := time.Now() | ||
timeValue := fields[timeKey] | ||
if timeValue != nil { | ||
var err error | ||
switch binData.TimeFormat { | ||
switch binData.timeFormat { | ||
case "unix": | ||
metricTime, err = internal.ParseTimestamp(binData.TimeFormat, int64(timeValue.(int32)), timezone) | ||
if _, ok := timeValue.(int32); !ok { | ||
return *nilTime, fmt.Errorf("invalid time type, must be int32") | ||
} | ||
metricTime, err = internal.ParseTimestamp(binData.timeFormat, int64(timeValue.(int32)), timezone) | ||
case "unix_ms", "unix_us", "unix_ns": | ||
metricTime, err = internal.ParseTimestamp(binData.TimeFormat, int64(timeValue.(int64)), timezone) | ||
if _, ok := timeValue.(int64); !ok { | ||
return *nilTime, fmt.Errorf("invalid time type, must be int64") | ||
} | ||
metricTime, err = internal.ParseTimestamp(binData.timeFormat, int64(timeValue.(int64)), timezone) | ||
default: | ||
return *nilTime, fmt.Errorf("invalid time format %s", binData.TimeFormat) | ||
return *nilTime, fmt.Errorf("invalid time format %s", binData.timeFormat) | ||
} | ||
if err != nil { | ||
return *nilTime, err | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding
machine
and make it the default? That is, by default use the endianess of the machine that we have to detect.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, can be done that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But, what if the measurement arrives from a different machine which has different than the endianness of the machine where Telegraf runs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then you have to specify the endianness (which can be done). However, it is not uncommon to have machines with the same endianness, so a sensible default should be chosen IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"be" is the default, and I commented the line out in README.md but can't see the update here.