-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wavefront/serializer: improve performance by ~30% #5842
Changes from 2 commits
ec4b147
96f2a2d
6fc8173
f63ab04
0b6a6b7
cb722eb
f4f19c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
package wavefront | ||
|
||
import ( | ||
"bytes" | ||
"errors" | ||
"fmt" | ||
"log" | ||
"strconv" | ||
|
@@ -16,6 +16,7 @@ type WavefrontSerializer struct { | |
Prefix string | ||
UseStrict bool | ||
SourceOverride []string | ||
scratch buffer | ||
} | ||
|
||
// catch many of the invalid chars that could appear in a metric or tag name | ||
|
@@ -48,18 +49,16 @@ func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*Wav | |
return s, nil | ||
} | ||
|
||
// Serialize : Serialize based on Wavefront format | ||
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { | ||
out := []byte{} | ||
metricSeparator := "." | ||
func (s *WavefrontSerializer) serialize(buf *buffer, m telegraf.Metric) { | ||
const metricSeparator = "." | ||
|
||
for fieldName, value := range m.Fields() { | ||
var name string | ||
|
||
if fieldName == "value" { | ||
name = fmt.Sprintf("%s%s", s.Prefix, m.Name()) | ||
name = s.Prefix + m.Name() | ||
} else { | ||
name = fmt.Sprintf("%s%s%s%s", s.Prefix, m.Name(), metricSeparator, fieldName) | ||
name = s.Prefix + m.Name() + metricSeparator + fieldName | ||
} | ||
|
||
if s.UseStrict { | ||
|
@@ -70,80 +69,62 @@ func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { | |
|
||
name = pathReplacer.Replace(name) | ||
|
||
metric := &wavefront.MetricPoint{ | ||
Metric: name, | ||
Timestamp: m.Time().Unix(), | ||
} | ||
|
||
metricValue, buildError := buildValue(value, metric.Metric) | ||
metricValue, buildError := buildValue(value, name) | ||
if buildError != nil { | ||
// bad value continue to next metric | ||
continue | ||
} | ||
metric.Value = metricValue | ||
|
||
source, tags := buildTags(m.Tags(), s) | ||
metric.Source = source | ||
metric.Tags = tags | ||
|
||
out = append(out, formatMetricPoint(metric, s)...) | ||
metric := wavefront.MetricPoint{ | ||
Metric: name, | ||
Timestamp: m.Time().Unix(), | ||
Value: metricValue, | ||
Source: source, | ||
Tags: tags, | ||
} | ||
formatMetricPoint(&s.scratch, &metric, s) | ||
} | ||
return out, nil | ||
} | ||
|
||
// Serialize : Serialize based on Wavefront format | ||
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { | ||
s.scratch.Reset() | ||
s.serialize(&s.scratch, m) | ||
return s.scratch.Copy(), nil | ||
} | ||
|
||
func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { | ||
var batch bytes.Buffer | ||
s.scratch.Reset() | ||
for _, m := range metrics { | ||
buf, err := s.Serialize(m) | ||
if err != nil { | ||
return nil, err | ||
} | ||
_, err = batch.Write(buf) | ||
if err != nil { | ||
return nil, err | ||
s.serialize(&s.scratch, m) | ||
} | ||
return s.scratch.Copy(), nil | ||
} | ||
|
||
func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string { | ||
if src, ok := mTags["source"]; ok { | ||
delete(mTags, "source") | ||
return src | ||
} | ||
for _, src := range s.SourceOverride { | ||
if source, ok := mTags[src]; ok { | ||
delete(mTags, src) | ||
mTags["telegraf_host"] = mTags["host"] | ||
return source | ||
} | ||
} | ||
return batch.Bytes(), nil | ||
return mTags["host"] | ||
} | ||
|
||
func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) { | ||
|
||
// Remove all empty tags. | ||
for k, v := range mTags { | ||
if v == "" { | ||
delete(mTags, k) | ||
} | ||
} | ||
|
||
var source string | ||
|
||
if src, ok := mTags["source"]; ok { | ||
source = src | ||
delete(mTags, "source") | ||
} else { | ||
sourceTagFound := false | ||
for _, src := range s.SourceOverride { | ||
for k, v := range mTags { | ||
if k == src { | ||
source = v | ||
mTags["telegraf_host"] = mTags["host"] | ||
sourceTagFound = true | ||
delete(mTags, k) | ||
break | ||
} | ||
} | ||
if sourceTagFound { | ||
break | ||
} | ||
} | ||
|
||
if !sourceTagFound { | ||
source = mTags["host"] | ||
} | ||
} | ||
|
||
source := findSourceTag(mTags, s) | ||
delete(mTags, "host") | ||
|
||
return tagValueReplacer.Replace(source), mTags | ||
} | ||
|
||
|
@@ -156,14 +137,14 @@ func buildValue(v interface{}, name string) (float64, error) { | |
return 0, nil | ||
} | ||
case int64: | ||
return float64(v.(int64)), nil | ||
return float64(p), nil | ||
case uint64: | ||
return float64(v.(uint64)), nil | ||
return float64(p), nil | ||
case float64: | ||
return v.(float64), nil | ||
return p, nil | ||
case string: | ||
// return an error but don't log | ||
return 0, fmt.Errorf("string type not supported") | ||
return 0, errors.New("string type not supported") | ||
default: | ||
// return an error and log a debug message | ||
err := fmt.Errorf("unexpected type: %T, with value: %v, for :%s", v, v, name) | ||
|
@@ -172,31 +153,66 @@ func buildValue(v interface{}, name string) (float64, error) { | |
} | ||
} | ||
|
||
func formatMetricPoint(metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte { | ||
var buffer bytes.Buffer | ||
buffer.WriteString("\"") | ||
buffer.WriteString(metricPoint.Metric) | ||
buffer.WriteString("\" ") | ||
buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64)) | ||
buffer.WriteString(" ") | ||
buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10)) | ||
buffer.WriteString(" source=\"") | ||
buffer.WriteString(metricPoint.Source) | ||
buffer.WriteString("\"") | ||
func formatMetricPoint(b *buffer, metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte { | ||
b.WriteChar('"') | ||
b.WriteString(metricPoint.Metric) | ||
b.WriteString(`" `) | ||
b.WriteFloat64(metricPoint.Value) | ||
b.WriteChar(' ') | ||
b.WriteUnit64(uint64(metricPoint.Timestamp)) | ||
b.WriteString(` source="`) | ||
b.WriteString(metricPoint.Source) | ||
b.WriteChar('"') | ||
|
||
for k, v := range metricPoint.Tags { | ||
buffer.WriteString(" \"") | ||
b.WriteString(` "`) | ||
if s.UseStrict { | ||
buffer.WriteString(strictSanitizedChars.Replace(k)) | ||
b.WriteString(strictSanitizedChars.Replace(k)) | ||
} else { | ||
buffer.WriteString(sanitizedChars.Replace(k)) | ||
b.WriteString(sanitizedChars.Replace(k)) | ||
} | ||
buffer.WriteString("\"=\"") | ||
buffer.WriteString(tagValueReplacer.Replace(v)) | ||
buffer.WriteString("\"") | ||
b.WriteString(`"="`) | ||
b.WriteString(tagValueReplacer.Replace(v)) | ||
b.WriteChar('"') | ||
} | ||
|
||
buffer.WriteString("\n") | ||
b.WriteChar('\n') | ||
|
||
return *b | ||
} | ||
|
||
// Use a fast and simple buffer for constructing statsd messages | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm guessing this comment is a left over from something else? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. kinda, and good catch - will remove. |
||
type buffer []byte | ||
|
||
func (b *buffer) Reset() { *b = (*b)[:0] } | ||
|
||
func (b *buffer) Copy() []byte { | ||
p := make([]byte, len(*b)) | ||
copy(p, *b) | ||
return p | ||
} | ||
|
||
func (b *buffer) Write(p []byte) { | ||
*b = append(*b, p...) | ||
} | ||
|
||
func (b *buffer) WriteString(s string) { | ||
*b = append(*b, s...) | ||
} | ||
|
||
// This is named WriteChar instead of WriteByte because the 'stdmethods' check | ||
// of 'go vet' wants WriteByte to have the signature: | ||
// | ||
// func (b *buffer) WriteByte(c byte) error { ... } | ||
// | ||
func (b *buffer) WriteChar(c byte) { | ||
*b = append(*b, c) | ||
} | ||
|
||
func (b *buffer) WriteUnit64(val uint64) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. function name has a typo in it, which can be misleading (Unit vs Uint) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤦♂ thanks - fixed |
||
*b = strconv.AppendUint(*b, val, 10) | ||
} | ||
|
||
return buffer.Bytes() | ||
func (b *buffer) WriteFloat64(val float64) { | ||
*b = strconv.AppendFloat(*b, val, 'f', 6, 64) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always ignore this error - is there any point returning it from
buildValue()
? This was the case before this PR was made.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to be used to determine if the value should be skipped or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically you log on error from the caller, but we log inside the buildValue function instead to avoid noisy log outputs when the value is of type string (which is expected to not work).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the insight, I just changed this to return a
bool
instead of anerror
since we used theerror
as a bool (nil / not-nil) and this saves an alloc.