Skip to content

Commit

Permalink
Fix race condition in the Wavefront parser (influxdata#5764)
Browse files Browse the repository at this point in the history
  • Loading branch information
prydin authored and bitcharmer committed Oct 18, 2019
1 parent 871c5d9 commit d1b56f2
Showing 1 changed file with 46 additions and 23 deletions.
69 changes: 46 additions & 23 deletions plugins/parsers/wavefront/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"log"
"strconv"
"sync"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -22,18 +23,23 @@ type Point struct {
Tags map[string]string
}

// Parser represents a parser.
type WavefrontParser struct {
parsers *sync.Pool
defaultTags map[string]string
}

// PointParser is a thread-unsafe parser and must be kept in a pool.
type PointParser struct {
s *PointScanner
buf struct {
tok []Token // last read n tokens
lit []string // last read n literals
n int // unscanned buffer size (max=2)
}
scanBuf bytes.Buffer // buffer reused for scanning tokens
writeBuf bytes.Buffer // buffer reused for parsing elements
Elements []ElementParser
defaultTags map[string]string
scanBuf bytes.Buffer // buffer reused for scanning tokens
writeBuf bytes.Buffer // buffer reused for parsing elements
Elements []ElementParser
parent *WavefrontParser
}

// Returns a slice of ElementParser's for the Graphite format
Expand All @@ -47,9 +53,40 @@ func NewWavefrontElements() []ElementParser {
return elements
}

func NewWavefrontParser(defaultTags map[string]string) *PointParser {
func NewWavefrontParser(defaultTags map[string]string) *WavefrontParser {
wp := &WavefrontParser{defaultTags: defaultTags}
wp.parsers = &sync.Pool{
New: func() interface{} {
return NewPointParser(wp)
},
}
return wp
}

func NewPointParser(parent *WavefrontParser) *PointParser {
elements := NewWavefrontElements()
return &PointParser{Elements: elements, defaultTags: defaultTags}
return &PointParser{Elements: elements, parent: parent}
}

func (p *WavefrontParser) ParseLine(line string) (telegraf.Metric, error) {
buf := []byte(line)

metrics, err := p.Parse(buf)
if err != nil {
return nil, err
}

if len(metrics) > 0 {
return metrics[0], nil
}

return nil, nil
}

func (p *WavefrontParser) Parse(buf []byte) ([]telegraf.Metric, error) {
pp := p.parsers.Get().(*PointParser)
defer p.parsers.Put(pp)
return pp.Parse(buf)
}

func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) {
Expand Down Expand Up @@ -91,21 +128,7 @@ func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil
}

func (p *PointParser) ParseLine(line string) (telegraf.Metric, error) {
buf := []byte(line)
metrics, err := p.Parse(buf)
if err != nil {
return nil, err
}

if len(metrics) > 0 {
return metrics[0], nil
}

return nil, nil
}

func (p *PointParser) SetDefaultTags(tags map[string]string) {
func (p *WavefrontParser) SetDefaultTags(tags map[string]string) {
p.defaultTags = tags
}

Expand All @@ -119,7 +142,7 @@ func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.M
tags[k] = v
}
// apply default tags after parsed tags
for k, v := range p.defaultTags {
for k, v := range p.parent.defaultTags {
tags[k] = v
}

Expand Down

0 comments on commit d1b56f2

Please sign in to comment.