Skip to content

Commit

Permalink
feat: use new Influx Line Protocol Parser
Browse files Browse the repository at this point in the history
This switches to using the Influx Line Protocol Parser from
https://github.com/influxdata/line-protocol/

Authored-by: Alex Krantz alex@krantz.dev

Resolves: influxdata#9474
  • Loading branch information
akrantz01 authored and powersj committed Dec 2, 2021
1 parent 52c422a commit 4093dc8
Show file tree
Hide file tree
Showing 25 changed files with 799 additions and 7,510 deletions.
11 changes: 4 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ endif


GOFILES ?= $(shell git ls-files '*.go')
GOFMT ?= $(shell gofmt -l -s $(filter-out plugins/parsers/influx/machine.go, $(GOFILES)))
GOFMT ?= $(shell gofmt -l -s $(GOFILES))

prefix ?= /usr/local
bindir ?= $(prefix)/bin
Expand Down Expand Up @@ -118,7 +118,7 @@ test-integration:

.PHONY: fmt
fmt:
@gofmt -s -w $(filter-out plugins/parsers/influx/machine.go, $(GOFILES))
@gofmt -s -w $(GOFILES)

.PHONY: fmtcheck
fmtcheck:
Expand All @@ -132,8 +132,8 @@ fmtcheck:

.PHONY: vet
vet:
@echo 'go vet $$(go list ./... | grep -v ./plugins/parsers/influx)'
@go vet $$(go list ./... | grep -v ./plugins/parsers/influx) ; if [ $$? -ne 0 ]; then \
@echo 'go vet $$(go list ./...)'
@go vet $$(go list ./...) ; if [ $$? -ne 0 ]; then \
echo ""; \
echo "go vet has found suspicious constructs. Please remediate any reported errors"; \
echo "to fix them before submitting code for review."; \
Expand Down Expand Up @@ -201,9 +201,6 @@ clean:
docker-image:
docker build -f scripts/buster.docker -t "telegraf:$(commit)" .

plugins/parsers/influx/machine.go: plugins/parsers/influx/machine.go.rl
ragel -Z -G2 $^ -o $@

.PHONY: plugin-%
plugin-%:
@echo "Starting dev environment for $${$(@)} input plugin..."
Expand Down
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ following works:
- github.com/influxdata/influxdb-observability/common [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE)
- github.com/influxdata/influxdb-observability/influx2otel [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE)
- github.com/influxdata/influxdb-observability/otel2influx [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE)
- github.com/influxdata/line-protocol [MIT License](https://github.com/influxdata/line-protocol/blob/v2/LICENSE)
- github.com/influxdata/tail [MIT License](https://github.com/influxdata/tail/blob/master/LICENSE.txt)
- github.com/influxdata/toml [MIT License](https://github.com/influxdata/toml/blob/master/LICENSE)
- github.com/influxdata/wlog [MIT License](https://github.com/influxdata/wlog/blob/master/LICENSE)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ require (
github.com/influxdata/influxdb-observability/common v0.2.8
github.com/influxdata/influxdb-observability/influx2otel v0.2.8
github.com/influxdata/influxdb-observability/otel2influx v0.2.8
github.com/influxdata/line-protocol/v2 v2.2.1
github.com/influxdata/tail v1.0.1-0.20210707231403-b283181d1fa7
github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65
github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8
Expand Down
13 changes: 12 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,11 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.7.3/go.mod h1:V1d2J5pfxYH6EjBAgSK7YNXcXlTWxUHdE1sVDXkjnig=
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk=
github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
Expand Down Expand Up @@ -1240,7 +1243,15 @@ github.com/influxdata/influxdb-observability/otel2influx v0.2.8 h1:vTamg9mKUXHaX
github.com/influxdata/influxdb-observability/otel2influx v0.2.8/go.mod h1:xKTR9GLOtkSekysDKhAFNrPYpeiFV31Sy6zDqF54axA=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE=
github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98=
github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937 h1:MHJNQ+p99hFATQm6ORoLmpUCF7ovjwEFshs/NHzAbig=
github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo=
github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY=
github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY=
github.com/influxdata/line-protocol/v2 v2.2.1 h1:EAPkqJ9Km4uAxtMRgUubJyqAr6zgWM0dznKMLRauQRE=
github.com/influxdata/line-protocol/v2 v2.2.1/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM=
github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19ybifQhZoQNF5D8=
github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE=
github.com/influxdata/tail v1.0.1-0.20210707231403-b283181d1fa7 h1:0rQOs1VHLVFpAAOIR0mJEvVOIaMYFgYdreeVbgI9sII=
Expand Down
2 changes: 1 addition & 1 deletion plugins/common/shim/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *Shim) RunProcessor() error {
for {
m, err := parser.Next()
if err != nil {
if err == influx.EOF {
if err == influx.ErrEOF {
break // stream ended
}
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/execd/execd.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (e *Execd) cmdReadOutStream(out io.Reader) {
for {
metric, err := parser.Next()
if err != nil {
if err == influx.EOF {
if err == influx.ErrEOF {
break // stream ended
}
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
Expand Down
12 changes: 6 additions & 6 deletions plugins/inputs/influxdb_listener/influxdb_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,13 @@ func (h *InfluxDBListener) handleWrite() http.HandlerFunc {
parser.SetTimePrecision(precision)
}

if req.ContentLength >= 0 {
h.bytesRecv.Incr(req.ContentLength)
}

var m telegraf.Metric
var err error
var parseErrorCount int
var lastPos int
var firstParseErrorStr string
for {
select {
Expand All @@ -310,9 +313,6 @@ func (h *InfluxDBListener) handleWrite() http.HandlerFunc {
}

m, err = parser.Next()
pos := parser.Position()
h.bytesRecv.Incr(int64(pos - lastPos))
lastPos = pos

// Continue parsing metrics even if some are malformed
if parseErr, ok := err.(*influx.ParseError); ok {
Expand All @@ -324,7 +324,7 @@ func (h *InfluxDBListener) handleWrite() http.HandlerFunc {
continue
} else if err != nil {
// Either we're exiting cleanly (err ==
// influx.EOF) or there's an unexpected error
// influx.ErrEOF) or there's an unexpected error
break
}

Expand All @@ -338,7 +338,7 @@ func (h *InfluxDBListener) handleWrite() http.HandlerFunc {

h.acc.AddMetric(m)
}
if err != influx.EOF {
if err != influx.ErrEOF {
h.Log.Debugf("Error parsing the request body: %v", err.Error())
if err := badRequest(res, err.Error()); err != nil {
h.Log.Debugf("error in bad-request: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions plugins/inputs/influxdb_listener/influxdb_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,17 +621,17 @@ func TestWriteParseErrors(t *testing.T) {
{
name: "one parse error",
input: "foo value=1.0\nfoo value=2asdf2.0\nfoo value=3.0\nfoo value=4.0",
expected: `metric parse error: expected field at 2:12: "foo value=2"`,
expected: `metric parse error: cannot parse value for field key "value": invalid float value syntax at 2:11`,
},
{
name: "two parse errors",
input: "foo value=1asdf2.0\nfoo value=2.0\nfoo value=3asdf2.0\nfoo value=4.0",
expected: `metric parse error: expected field at 1:12: "foo value=1" (and 1 other parse error)`,
expected: `metric parse error: cannot parse value for field key "value": invalid float value syntax at 1:11 (and 1 other parse error)`,
},
{
name: "three or more parse errors",
input: "foo value=1asdf2.0\nfoo value=2.0\nfoo value=3asdf2.0\nfoo value=4asdf2.0",
expected: `metric parse error: expected field at 1:12: "foo value=1" (and 2 other parse errors)`,
expected: `metric parse error: cannot parse value for field key "value": invalid float value syntax at 1:11 (and 2 other parse errors)`,
},
}

Expand Down
8 changes: 4 additions & 4 deletions plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -264,22 +265,21 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
}
return
}
metricHandler := influx.NewMetricHandler()
parser := influx.NewParser(metricHandler)
parser := influx.NewParser()
parser.SetTimeFunc(h.timeFunc)

precisionStr := req.URL.Query().Get("precision")
if precisionStr != "" {
precision := getPrecisionMultiplier(precisionStr)
metricHandler.SetTimePrecision(precision)
parser.SetTimePrecision(precision)
}

var metrics []telegraf.Metric
var err error

metrics, err = parser.Parse(bytes)

if err != influx.EOF && err != nil {
if !errors.Is(err, influx.ErrEOF) && err != nil {
h.Log.Debugf("Error parsing the request body: %v", err.Error())
if err := badRequest(res, Invalid, err.Error()); err != nil {
h.Log.Debugf("error in bad-request: %v", err)
Expand Down
3 changes: 1 addition & 2 deletions plugins/inputs/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,7 @@ func TestCharacterEncoding(t *testing.T) {
}

plugin.SetParserFunc(func() (parsers.Parser, error) {
handler := influx.NewMetricHandler()
return influx.NewParser(handler), nil
return influx.NewParser(), nil
})

if tt.offset != 0 {
Expand Down
2 changes: 1 addition & 1 deletion plugins/outputs/execd/execd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func runOutputConsumerProgram() {
for {
m, err := parser.Next()
if err != nil {
if err == influx.EOF {
if err == influx.ErrEOF {
return // stream ended
}
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
Expand Down
3 changes: 1 addition & 2 deletions plugins/parsers/dropwizard/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ type parser struct {
}

func NewParser() *parser {
handler := influx.NewMetricHandler()
seriesParser := influx.NewSeriesParser(handler)
seriesParser := influx.NewSeriesParser()

parser := &parser{
timeFunc: time.Now,
Expand Down
92 changes: 0 additions & 92 deletions plugins/parsers/influx/escape.go

This file was deleted.

Loading

0 comments on commit 4093dc8

Please sign in to comment.