Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wavefront/serializer: improve performance by ~30% #5842

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 98 additions & 82 deletions plugins/serializers/wavefront/wavefront.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package wavefront

import (
"bytes"
"errors"
"fmt"
"log"
"strconv"
Expand All @@ -16,6 +16,7 @@ type WavefrontSerializer struct {
Prefix string
UseStrict bool
SourceOverride []string
scratch buffer
}

// catch many of the invalid chars that could appear in a metric or tag name
Expand Down Expand Up @@ -48,18 +49,16 @@ func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*Wav
return s, nil
}

// Serialize : Serialize based on Wavefront format
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {
out := []byte{}
metricSeparator := "."
func (s *WavefrontSerializer) serialize(buf *buffer, m telegraf.Metric) {
const metricSeparator = "."

for fieldName, value := range m.Fields() {
var name string

if fieldName == "value" {
name = fmt.Sprintf("%s%s", s.Prefix, m.Name())
name = s.Prefix + m.Name()
} else {
name = fmt.Sprintf("%s%s%s%s", s.Prefix, m.Name(), metricSeparator, fieldName)
name = s.Prefix + m.Name() + metricSeparator + fieldName
}

if s.UseStrict {
Expand All @@ -70,80 +69,62 @@ func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {

name = pathReplacer.Replace(name)

metric := &wavefront.MetricPoint{
Metric: name,
Timestamp: m.Time().Unix(),
}

metricValue, buildError := buildValue(value, metric.Metric)
metricValue, buildError := buildValue(value, name)
Copy link
Contributor Author

@charlievieth charlievieth May 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always ignore this error - is there any point returning it from buildValue()? This was the case before this PR was made.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be used to determine if the value should be skipped or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically you log on error from the caller, but we log inside the buildValue function instead to avoid noisy log outputs when the value is of type string (which is expected to not work).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the insight, I just changed this to return a bool instead of an error since we used the error as a bool (nil / not-nil) and this saves an alloc.

if buildError != nil {
// bad value continue to next metric
continue
}
metric.Value = metricValue

source, tags := buildTags(m.Tags(), s)
metric.Source = source
metric.Tags = tags

out = append(out, formatMetricPoint(metric, s)...)
metric := wavefront.MetricPoint{
Metric: name,
Timestamp: m.Time().Unix(),
Value: metricValue,
Source: source,
Tags: tags,
}
formatMetricPoint(&s.scratch, &metric, s)
}
return out, nil
}

// Serialize : Serialize based on Wavefront format
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {
s.scratch.Reset()
s.serialize(&s.scratch, m)
return s.scratch.Copy(), nil
}

func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
s.scratch.Reset()
for _, m := range metrics {
buf, err := s.Serialize(m)
if err != nil {
return nil, err
}
_, err = batch.Write(buf)
if err != nil {
return nil, err
s.serialize(&s.scratch, m)
}
return s.scratch.Copy(), nil
}

func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string {
if src, ok := mTags["source"]; ok {
delete(mTags, "source")
return src
}
for _, src := range s.SourceOverride {
if source, ok := mTags[src]; ok {
delete(mTags, src)
mTags["telegraf_host"] = mTags["host"]
return source
}
}
return batch.Bytes(), nil
return mTags["host"]
}

func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) {

// Remove all empty tags.
for k, v := range mTags {
if v == "" {
delete(mTags, k)
}
}

var source string

if src, ok := mTags["source"]; ok {
source = src
delete(mTags, "source")
} else {
sourceTagFound := false
for _, src := range s.SourceOverride {
for k, v := range mTags {
if k == src {
source = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)
break
}
}
if sourceTagFound {
break
}
}

if !sourceTagFound {
source = mTags["host"]
}
}

source := findSourceTag(mTags, s)
delete(mTags, "host")

return tagValueReplacer.Replace(source), mTags
}

Expand All @@ -156,14 +137,14 @@ func buildValue(v interface{}, name string) (float64, error) {
return 0, nil
}
case int64:
return float64(v.(int64)), nil
return float64(p), nil
case uint64:
return float64(v.(uint64)), nil
return float64(p), nil
case float64:
return v.(float64), nil
return p, nil
case string:
// return an error but don't log
return 0, fmt.Errorf("string type not supported")
return 0, errors.New("string type not supported")
default:
// return an error and log a debug message
err := fmt.Errorf("unexpected type: %T, with value: %v, for :%s", v, v, name)
Expand All @@ -172,31 +153,66 @@ func buildValue(v interface{}, name string) (float64, error) {
}
}

func formatMetricPoint(metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte {
var buffer bytes.Buffer
buffer.WriteString("\"")
buffer.WriteString(metricPoint.Metric)
buffer.WriteString("\" ")
buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64))
buffer.WriteString(" ")
buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10))
buffer.WriteString(" source=\"")
buffer.WriteString(metricPoint.Source)
buffer.WriteString("\"")
func formatMetricPoint(b *buffer, metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte {
b.WriteChar('"')
b.WriteString(metricPoint.Metric)
b.WriteString(`" `)
b.WriteFloat64(metricPoint.Value)
b.WriteChar(' ')
b.WriteUnit64(uint64(metricPoint.Timestamp))
b.WriteString(` source="`)
b.WriteString(metricPoint.Source)
b.WriteChar('"')

for k, v := range metricPoint.Tags {
buffer.WriteString(" \"")
b.WriteString(` "`)
if s.UseStrict {
buffer.WriteString(strictSanitizedChars.Replace(k))
b.WriteString(strictSanitizedChars.Replace(k))
} else {
buffer.WriteString(sanitizedChars.Replace(k))
b.WriteString(sanitizedChars.Replace(k))
}
buffer.WriteString("\"=\"")
buffer.WriteString(tagValueReplacer.Replace(v))
buffer.WriteString("\"")
b.WriteString(`"="`)
b.WriteString(tagValueReplacer.Replace(v))
b.WriteChar('"')
}

buffer.WriteString("\n")
b.WriteChar('\n')

return *b
}

// Use a fast and simple buffer for constructing statsd messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this comment is a left over from something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda, and good catch - will remove.

type buffer []byte

func (b *buffer) Reset() { *b = (*b)[:0] }

func (b *buffer) Copy() []byte {
p := make([]byte, len(*b))
copy(p, *b)
return p
}

func (b *buffer) Write(p []byte) {
*b = append(*b, p...)
}

func (b *buffer) WriteString(s string) {
*b = append(*b, s...)
}

// This is named WriteChar instead of WriteByte because the 'stdmethods' check
// of 'go vet' wants WriteByte to have the signature:
//
// func (b *buffer) WriteByte(c byte) error { ... }
//
func (b *buffer) WriteChar(c byte) {
*b = append(*b, c)
}

func (b *buffer) WriteUnit64(val uint64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function name has a typo in it, which can be misleading (Unit vs Uint)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂ thanks - fixed

*b = strconv.AppendUint(*b, val, 10)
}

return buffer.Bytes()
func (b *buffer) WriteFloat64(val float64) {
*b = strconv.AppendFloat(*b, val, 'f', 6, 64)
}
49 changes: 47 additions & 2 deletions plugins/serializers/wavefront/wavefront_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs/wavefront"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -132,7 +133,7 @@ func TestFormatMetricPoint(t *testing.T) {
s := WavefrontSerializer{}

for _, pt := range pointTests {
bout := formatMetricPoint(pt.ptIn, &s)
bout := formatMetricPoint(new(buffer), pt.ptIn, &s)
sout := string(bout[:])
if sout != pt.out {
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
Expand Down Expand Up @@ -160,7 +161,7 @@ func TestUseStrict(t *testing.T) {
s := WavefrontSerializer{UseStrict: true}

for _, pt := range pointTests {
bout := formatMetricPoint(pt.ptIn, &s)
bout := formatMetricPoint(new(buffer), pt.ptIn, &s)
sout := string(bout[:])
if sout != pt.out {
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
Expand Down Expand Up @@ -293,3 +294,47 @@ func TestSerializeMetricPrefix(t *testing.T) {
expS := []string{fmt.Sprintf("\"telegraf.cpu.usage.idle\" 91.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)}
assert.Equal(t, expS, mS)
}

func benchmarkMetrics(b *testing.B) [4]telegraf.Metric {
b.Helper()
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
"host": "realHost",
}
newMetric := func(v interface{}) telegraf.Metric {
fields := map[string]interface{}{
"usage_idle": v,
}
m, err := metric.New("cpu", tags, fields, now)
if err != nil {
b.Fatal(err)
}
return m
}
return [4]telegraf.Metric{
newMetric(91.5),
newMetric(91),
newMetric(true),
newMetric(false),
}
}

func BenchmarkSerialize(b *testing.B) {
var s WavefrontSerializer
metrics := benchmarkMetrics(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Serialize(metrics[i%len(metrics)])
}
}

func BenchmarkSerializeBatch(b *testing.B) {
var s WavefrontSerializer
m := benchmarkMetrics(b)
metrics := m[:]
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.SerializeBatch(metrics)
}
}