Skip to content

Commit

Permalink
Improve performance of wavefront serializer (influxdata#5842)
Browse files Browse the repository at this point in the history
  • Loading branch information
Charlie Vieth authored and Mathieu Lecarme committed Apr 17, 2020
1 parent b66344d commit b2f130a
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 94 deletions.
200 changes: 108 additions & 92 deletions plugins/serializers/wavefront/wavefront.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package wavefront

import (
"bytes"
"fmt"
"log"
"strconv"
"strings"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs/wavefront"
Expand All @@ -16,6 +15,8 @@ type WavefrontSerializer struct {
Prefix string
UseStrict bool
SourceOverride []string
scratch buffer
mu sync.Mutex // buffer mutex
}

// catch many of the invalid chars that could appear in a metric or tag name
Expand Down Expand Up @@ -48,18 +49,16 @@ func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*Wav
return s, nil
}

// Serialize : Serialize based on Wavefront format
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {
out := []byte{}
metricSeparator := "."
func (s *WavefrontSerializer) serialize(buf *buffer, m telegraf.Metric) {
const metricSeparator = "."

for fieldName, value := range m.Fields() {
var name string

if fieldName == "value" {
name = fmt.Sprintf("%s%s", s.Prefix, m.Name())
name = s.Prefix + m.Name()
} else {
name = fmt.Sprintf("%s%s%s%s", s.Prefix, m.Name(), metricSeparator, fieldName)
name = s.Prefix + m.Name() + metricSeparator + fieldName
}

if s.UseStrict {
Expand All @@ -70,133 +69,150 @@ func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {

name = pathReplacer.Replace(name)

metric := &wavefront.MetricPoint{
Metric: name,
Timestamp: m.Time().Unix(),
}

metricValue, buildError := buildValue(value, metric.Metric)
if buildError != nil {
metricValue, valid := buildValue(value, name)
if !valid {
// bad value continue to next metric
continue
}
metric.Value = metricValue

source, tags := buildTags(m.Tags(), s)
metric.Source = source
metric.Tags = tags

out = append(out, formatMetricPoint(metric, s)...)
metric := wavefront.MetricPoint{
Metric: name,
Timestamp: m.Time().Unix(),
Value: metricValue,
Source: source,
Tags: tags,
}
formatMetricPoint(&s.scratch, &metric, s)
}
}

// Serialize : Serialize based on Wavefront format
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {
s.mu.Lock()
s.scratch.Reset()
s.serialize(&s.scratch, m)
out := s.scratch.Copy()
s.mu.Unlock()
return out, nil
}

func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
s.mu.Lock()
s.scratch.Reset()
for _, m := range metrics {
buf, err := s.Serialize(m)
if err != nil {
return nil, err
}
_, err = batch.Write(buf)
if err != nil {
return nil, err
s.serialize(&s.scratch, m)
}
out := s.scratch.Copy()
s.mu.Unlock()
return out, nil
}

func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string {
if src, ok := mTags["source"]; ok {
delete(mTags, "source")
return src
}
for _, src := range s.SourceOverride {
if source, ok := mTags[src]; ok {
delete(mTags, src)
mTags["telegraf_host"] = mTags["host"]
return source
}
}
return batch.Bytes(), nil
return mTags["host"]
}

func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) {

// Remove all empty tags.
for k, v := range mTags {
if v == "" {
delete(mTags, k)
}
}

var source string

if src, ok := mTags["source"]; ok {
source = src
delete(mTags, "source")
} else {
sourceTagFound := false
for _, src := range s.SourceOverride {
for k, v := range mTags {
if k == src {
source = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)
break
}
}
if sourceTagFound {
break
}
}

if !sourceTagFound {
source = mTags["host"]
}
}

source := findSourceTag(mTags, s)
delete(mTags, "host")

return tagValueReplacer.Replace(source), mTags
}

func buildValue(v interface{}, name string) (float64, error) {
func buildValue(v interface{}, name string) (val float64, valid bool) {
switch p := v.(type) {
case bool:
if p {
return 1, nil
} else {
return 0, nil
return 1, true
}
return 0, true
case int64:
return float64(v.(int64)), nil
return float64(p), true
case uint64:
return float64(v.(uint64)), nil
return float64(p), true
case float64:
return v.(float64), nil
return p, true
case string:
// return an error but don't log
return 0, fmt.Errorf("string type not supported")
// return false but don't log
return 0, false
default:
// return an error and log a debug message
err := fmt.Errorf("unexpected type: %T, with value: %v, for :%s", v, v, name)
log.Printf("D! Serializer [wavefront] %s\n", err.Error())
return 0, err
// log a debug message
log.Printf("D! Serializer [wavefront] unexpected type: %T, with value: %v, for :%s\n",
v, v, name)
return 0, false
}
}

func formatMetricPoint(metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte {
var buffer bytes.Buffer
buffer.WriteString("\"")
buffer.WriteString(metricPoint.Metric)
buffer.WriteString("\" ")
buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64))
buffer.WriteString(" ")
buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10))
buffer.WriteString(" source=\"")
buffer.WriteString(metricPoint.Source)
buffer.WriteString("\"")
func formatMetricPoint(b *buffer, metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte {
b.WriteChar('"')
b.WriteString(metricPoint.Metric)
b.WriteString(`" `)
b.WriteFloat64(metricPoint.Value)
b.WriteChar(' ')
b.WriteUint64(uint64(metricPoint.Timestamp))
b.WriteString(` source="`)
b.WriteString(metricPoint.Source)
b.WriteChar('"')

for k, v := range metricPoint.Tags {
buffer.WriteString(" \"")
b.WriteString(` "`)
if s.UseStrict {
buffer.WriteString(strictSanitizedChars.Replace(k))
b.WriteString(strictSanitizedChars.Replace(k))
} else {
buffer.WriteString(sanitizedChars.Replace(k))
b.WriteString(sanitizedChars.Replace(k))
}
buffer.WriteString("\"=\"")
buffer.WriteString(tagValueReplacer.Replace(v))
buffer.WriteString("\"")
b.WriteString(`"="`)
b.WriteString(tagValueReplacer.Replace(v))
b.WriteChar('"')
}

buffer.WriteString("\n")
b.WriteChar('\n')

return *b
}

type buffer []byte

func (b *buffer) Reset() { *b = (*b)[:0] }

func (b *buffer) Copy() []byte {
p := make([]byte, len(*b))
copy(p, *b)
return p
}

func (b *buffer) WriteString(s string) {
*b = append(*b, s...)
}

// This is named WriteChar instead of WriteByte because the 'stdmethods' check
// of 'go vet' wants WriteByte to have the signature:
//
// func (b *buffer) WriteByte(c byte) error { ... }
//
func (b *buffer) WriteChar(c byte) {
*b = append(*b, c)
}

func (b *buffer) WriteUint64(val uint64) {
*b = strconv.AppendUint(*b, val, 10)
}

return buffer.Bytes()
func (b *buffer) WriteFloat64(val float64) {
*b = strconv.AppendFloat(*b, val, 'f', 6, 64)
}
49 changes: 47 additions & 2 deletions plugins/serializers/wavefront/wavefront_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs/wavefront"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -132,7 +133,7 @@ func TestFormatMetricPoint(t *testing.T) {
s := WavefrontSerializer{}

for _, pt := range pointTests {
bout := formatMetricPoint(pt.ptIn, &s)
bout := formatMetricPoint(new(buffer), pt.ptIn, &s)
sout := string(bout[:])
if sout != pt.out {
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
Expand Down Expand Up @@ -160,7 +161,7 @@ func TestUseStrict(t *testing.T) {
s := WavefrontSerializer{UseStrict: true}

for _, pt := range pointTests {
bout := formatMetricPoint(pt.ptIn, &s)
bout := formatMetricPoint(new(buffer), pt.ptIn, &s)
sout := string(bout[:])
if sout != pt.out {
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
Expand Down Expand Up @@ -293,3 +294,47 @@ func TestSerializeMetricPrefix(t *testing.T) {
expS := []string{fmt.Sprintf("\"telegraf.cpu.usage.idle\" 91.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)}
assert.Equal(t, expS, mS)
}

func benchmarkMetrics(b *testing.B) [4]telegraf.Metric {
b.Helper()
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
"host": "realHost",
}
newMetric := func(v interface{}) telegraf.Metric {
fields := map[string]interface{}{
"usage_idle": v,
}
m, err := metric.New("cpu", tags, fields, now)
if err != nil {
b.Fatal(err)
}
return m
}
return [4]telegraf.Metric{
newMetric(91.5),
newMetric(91),
newMetric(true),
newMetric(false),
}
}

func BenchmarkSerialize(b *testing.B) {
var s WavefrontSerializer
metrics := benchmarkMetrics(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Serialize(metrics[i%len(metrics)])
}
}

func BenchmarkSerializeBatch(b *testing.B) {
var s WavefrontSerializer
m := benchmarkMetrics(b)
metrics := m[:]
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.SerializeBatch(metrics)
}
}

0 comments on commit b2f130a

Please sign in to comment.