Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Carbon2 serializer: sanitize metric name #9026

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,7 @@ func (c *Config) buildSerializer(name string, tbl *ast.Table) (serializers.Seria
c.getFieldString(tbl, "template", &sc.Template)
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
c.getFieldString(tbl, "carbon2_format", &sc.Carbon2Format)
c.getFieldString(tbl, "carbon2_sanitize_replace_char", &sc.Carbon2SanitizeReplaceChar)
c.getFieldInt(tbl, "influx_max_line_bytes", &sc.InfluxMaxLineBytes)

c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields)
Expand Down Expand Up @@ -1451,9 +1452,9 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,

func (c *Config) missingTomlField(typ reflect.Type, key string) error {
switch key {
case "alias", "carbon2_format", "collectd_auth_file", "collectd_parse_multivalue",
"collectd_security_level", "collectd_typesdb", "collection_jitter", "csv_column_names",
"csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count",
case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file",
"collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter",
"csv_column_names", "csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count",
"csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns",
"csv_timestamp_column", "csv_timestamp_format", "csv_timezone", "csv_trim_space", "csv_skip_values",
"data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path",
Expand Down
16 changes: 8 additions & 8 deletions plugins/outputs/sumologic/sumologic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestMethod(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestStatusCode(t *testing.T) {
w.WriteHeader(tt.statusCode)
})

serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)

tt.plugin.SetSerializer(serializer)
Expand All @@ -198,7 +198,7 @@ func TestContentType(t *testing.T) {
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
s.SetSerializer(sr)
return s
Expand All @@ -212,7 +212,7 @@ func TestContentType(t *testing.T) {
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField))
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
s.SetSerializer(sr)
return s
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestContentEncodingGzip(t *testing.T) {
w.WriteHeader(http.StatusNoContent)
})

serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestDefaultUserAgent(t *testing.T) {
MaxRequstBodySize: Default().MaxRequstBodySize,
}

serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)

plugin.SetSerializer(serializer)
Expand Down Expand Up @@ -595,7 +595,7 @@ func TestMaxRequestBodySize(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -627,7 +627,7 @@ func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) {
plugin := Default()
plugin.URL = u.String()

serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
plugin.SetSerializer(serializer)

Expand Down
16 changes: 16 additions & 0 deletions plugins/serializers/carbon2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 f
## * "metric_includes_field"
## * "" - defaults to "field_separate"
# carbon2_format = "field_separate"

## Character used for replacing sanitized characters. By default ":" is used.
## The following character set is being replaced with sanitize replace char:
## !@#$%^&*()+`'\"[]{};<>,?/\\|=
# carbon2_sanitize_replace_char = ":"
```

Standard form:
Expand Down Expand Up @@ -52,6 +57,17 @@ metric=name_field_2 host=foo 4 1234567890
metric=name_field_N host=foo 59 1234567890
```

### Metric name sanitization

In order to sanitize the metric name one can specify `carbon2_sanitize_replace_char`
in order to replace the following characters in the metric name:

```
!@#$%^&*()+`'\"[]{};<>,?/\\|=
```

By default they will be replaced with `:`.

## Metrics

The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields.
Expand Down
36 changes: 31 additions & 5 deletions plugins/serializers/carbon2/carbon2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package carbon2

import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -23,11 +24,23 @@ var formats = map[format]struct{}{
Carbon2FormatMetricIncludesField: {},
}

const (
DefaultSanitizeReplaceChar = ":"
sanitizedChars = "!@#$%^&*()+`'\"[]{};<>,?/\\|="
)

type Serializer struct {
metricsFormat format
metricsFormat format
sanitizeReplacer *strings.Replacer
}

func NewSerializer(metricsFormat string) (*Serializer, error) {
func NewSerializer(metricsFormat string, sanitizeReplaceChar string) (*Serializer, error) {
if sanitizeReplaceChar == "" {
sanitizeReplaceChar = DefaultSanitizeReplaceChar
} else if len(sanitizeReplaceChar) > 1 {
return nil, errors.New("sanitize replace char has to be a singular character")
}

var f = format(metricsFormat)

if _, ok := formats[f]; !ok {
Expand All @@ -40,7 +53,8 @@ func NewSerializer(metricsFormat string) (*Serializer, error) {
}

return &Serializer{
metricsFormat: f,
metricsFormat: f,
sanitizeReplacer: createSanitizeReplacer(sanitizedChars, rune(sanitizeReplaceChar[0])),
}, nil
}

Expand All @@ -65,15 +79,17 @@ func (s *Serializer) createObject(metric telegraf.Metric) []byte {
continue
}

name := s.sanitizeReplacer.Replace(metric.Name())

switch metricsFormat {
case Carbon2FormatFieldSeparate:
m.WriteString(serializeMetricFieldSeparate(
metric.Name(), fieldName,
name, fieldName,
))

case Carbon2FormatMetricIncludesField:
m.WriteString(serializeMetricIncludeField(
metric.Name(), fieldName,
name, fieldName,
))
}

Expand Down Expand Up @@ -152,3 +168,13 @@ func bool2int(b bool) int {
}
return i
}

// createSanitizeReplacer creates string replacer replacing all provided
// characters with the replaceChar.
func createSanitizeReplacer(sanitizedChars string, replaceChar rune) *strings.Replacer {
sanitizeCharPairs := make([]string, 0, 2*len(sanitizedChars))
for _, c := range sanitizedChars {
sanitizeCharPairs = append(sanitizeCharPairs, string(c), string(replaceChar))
}
return strings.NewReplacer(sanitizeCharPairs...)
}
124 changes: 117 additions & 7 deletions plugins/serializers/carbon2/carbon2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestSerializeMetricFloat(t *testing.T) {

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.Serialize(m)
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestSerializeMetricWithEmptyStringTag(t *testing.T) {

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.Serialize(m)
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestSerializeWithSpaces(t *testing.T) {

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.Serialize(m)
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestSerializeMetricInt(t *testing.T) {

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.Serialize(m)
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestSerializeMetricString(t *testing.T) {

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.Serialize(m)
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestSerializeMetricBool(t *testing.T) {

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.Serialize(tc.metric)
Expand Down Expand Up @@ -300,7 +300,7 @@ metric=cpu_value 42 0

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.SerializeBatch(metrics)
Expand All @@ -310,3 +310,113 @@ metric=cpu_value 42 0
})
}
}

func TestSerializeMetricIsProperlySanitized(t *testing.T) {
now := time.Now()

testcases := []struct {
metricFunc func() (telegraf.Metric, error)
format format
expected string
replaceChar string
expectedErr bool
}{
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1", nil, fields, now)
},
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu:1 field=usage_idle 91.5 %d\n", now.Unix()),
replaceChar: DefaultSanitizeReplaceChar,
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1", nil, fields, now)
},
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu_1 field=usage_idle 91.5 %d\n", now.Unix()),
replaceChar: "_",
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom", nil, fields, now)
},
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu:1:tmp:custom field=usage_idle 91.5 %d\n", now.Unix()),
replaceChar: DefaultSanitizeReplaceChar,
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
},
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace field=usage_idle 91.5 %d\n", now.Unix()),
replaceChar: DefaultSanitizeReplaceChar,
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
},
format: Carbon2FormatMetricIncludesField,
expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace_usage_idle 91.5 %d\n", now.Unix()),
replaceChar: DefaultSanitizeReplaceChar,
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
},
format: Carbon2FormatMetricIncludesField,
expected: fmt.Sprintf("metric=cpu_1_tmp_custom_namespace_usage_idle 91.5 %d\n", now.Unix()),
replaceChar: "_",
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
},
format: Carbon2FormatMetricIncludesField,
expectedErr: true,
replaceChar: "___",
},
}

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
m, err := tc.metricFunc()
require.NoError(t, err)

s, err := NewSerializer(string(tc.format), tc.replaceChar)
if tc.expectedErr {
require.Error(t, err)
return
}

require.NoError(t, err)

buf, err := s.Serialize(m)
require.NoError(t, err)

assert.Equal(t, tc.expected, string(buf))
})
}
}
Loading