forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathregistry.go
336 lines (276 loc) · 11 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
package serializers
import (
"fmt"
"regexp"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/carbon2"
"github.com/influxdata/telegraf/plugins/serializers/csv"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/plugins/serializers/msgpack"
"github.com/influxdata/telegraf/plugins/serializers/nowmetric"
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
"github.com/influxdata/telegraf/plugins/serializers/prometheusremotewrite"
"github.com/influxdata/telegraf/plugins/serializers/splunkmetric"
"github.com/influxdata/telegraf/plugins/serializers/wavefront"
)
// SerializerOutput is an interface for output plugins that are able to
// serialize telegraf metrics into arbitrary data formats.
type SerializerOutput interface {
// SetSerializer sets the serializer function for the interface.
SetSerializer(serializer Serializer)
}
// Serializer is an interface defining functions that a serializer plugin must
// satisfy.
//
// Implementations of this interface should be reentrant but are not required
// to be thread-safe.
type Serializer interface {
// Serialize takes a single telegraf metric and turns it into a byte buffer.
// separate metrics should be separated by a newline, and there should be
// a newline at the end of the buffer.
//
// New plugins should use SerializeBatch instead to allow for non-line
// delimited metrics.
Serialize(metric telegraf.Metric) ([]byte, error)
// SerializeBatch takes an array of telegraf metric and serializes it into
// a byte buffer. This method is not required to be suitable for use with
// line oriented framing.
SerializeBatch(metrics []telegraf.Metric) ([]byte, error)
}
// Config is a struct that covers the data types needed for all serializer types,
// and can be used to instantiate _any_ of the serializers.
type Config struct {
// DataFormat can be one of the serializer types listed in NewSerializer.
DataFormat string `toml:"data_format"`
// Carbon2 metric format.
Carbon2Format string `toml:"carbon2_format"`
// Character used for metric name sanitization in Carbon2.
Carbon2SanitizeReplaceChar string `toml:"carbon2_sanitize_replace_char"`
// Separator for CSV
CSVSeparator string `toml:"csv_separator"`
// Output a CSV header for naming the columns
CSVHeader bool `toml:"csv_header"`
// Prefix the tag and field columns for CSV format
CSVPrefix bool `toml:"csv_column_prefix"`
// Support tags in graphite protocol
GraphiteTagSupport bool `toml:"graphite_tag_support"`
// Support tags which follow the spec
GraphiteTagSanitizeMode string `toml:"graphite_tag_sanitize_mode"`
// Character for separating metric name and field for Graphite tags
GraphiteSeparator string `toml:"graphite_separator"`
// Regex string
GraphiteStrictRegex string `toml:"graphite_strict_sanitize_regex"`
// Maximum line length in bytes; influx format only
InfluxMaxLineBytes int `toml:"influx_max_line_bytes"`
// Sort field keys, set to true only when debugging as it less performant
// than unsorted fields; influx format only
InfluxSortFields bool `toml:"influx_sort_fields"`
// Support unsigned integer output; influx format only
InfluxUintSupport bool `toml:"influx_uint_support"`
// Prefix to add to all measurements, only supports Graphite
Prefix string `toml:"prefix"`
// Template for converting telegraf metrics into Graphite
// only supports Graphite
Template string `toml:"template"`
// Templates same Template, but multiple
Templates []string `toml:"templates"`
// Timestamp units to use for JSON formatted output
TimestampUnits time.Duration `toml:"timestamp_units"`
// Timestamp format to use for JSON and CSV formatted output
TimestampFormat string `toml:"timestamp_format"`
// Transformation as JSONata expression to use for JSON formatted output
Transformation string `toml:"transformation"`
// Field filter for interpreting data as nested JSON for JSON serializer
JSONNestedFieldInclude []string `toml:"json_nested_fields_include"`
JSONNestedFieldExclude []string `toml:"json_nested_fields_exclude"`
// Include HEC routing fields for splunkmetric output
HecRouting bool `toml:"hec_routing"`
// Enable Splunk MultiMetric output (Splunk 8.0+)
SplunkmetricMultiMetric bool `toml:"splunkmetric_multi_metric"`
// Omit the Splunk Event "metric" tag
SplunkmetricOmitEventTag bool `toml:"splunkmetric_omit_event_tag"`
// Point tags to use as the source name for Wavefront (if none found, host will be used).
WavefrontSourceOverride []string `toml:"wavefront_source_override"`
// Use Strict rules to sanitize metric and tag names from invalid characters for Wavefront
// When enabled forward slash (/) and comma (,) will be accepted
WavefrontUseStrict bool `toml:"wavefront_use_strict"`
// Convert "_" in prefixes to "." for Wavefront
WavefrontDisablePrefixConversion bool `toml:"wavefront_disable_prefix_conversion"`
// Include the metric timestamp on each sample.
PrometheusExportTimestamp bool `toml:"prometheus_export_timestamp"`
// Sort prometheus metric families and metric samples. Useful for
// debugging.
PrometheusSortMetrics bool `toml:"prometheus_sort_metrics"`
// Output string fields as metric labels; when false string fields are
// discarded.
PrometheusStringAsLabel bool `toml:"prometheus_string_as_label"`
// Encode metrics without HELP metadata. This helps reduce the payload size.
PrometheusCompactEncoding bool `toml:"prometheus_compact_encoding"`
}
// NewSerializer a Serializer interface based on the given config.
func NewSerializer(config *Config) (Serializer, error) {
var err error
var serializer Serializer
switch config.DataFormat {
case "csv":
serializer, err = NewCSVSerializer(config)
case "influx":
serializer, err = NewInfluxSerializerConfig(config), nil
case "graphite":
serializer, err = NewGraphiteSerializer(
config.Prefix,
config.Template,
config.GraphiteStrictRegex,
config.GraphiteTagSupport,
config.GraphiteTagSanitizeMode,
config.GraphiteSeparator,
config.Templates,
)
case "json":
serializer, err = NewJSONSerializer(config)
case "splunkmetric":
serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric, config.SplunkmetricOmitEventTag), nil
case "nowmetric":
serializer, err = NewNowSerializer()
case "carbon2":
serializer, err = NewCarbon2Serializer(config.Carbon2Format, config.Carbon2SanitizeReplaceChar)
case "wavefront":
serializer, err = NewWavefrontSerializer(
config.Prefix,
config.WavefrontUseStrict,
config.WavefrontSourceOverride,
config.WavefrontDisablePrefixConversion,
), nil
case "prometheus":
serializer, err = NewPrometheusSerializer(config), nil
case "prometheusremotewrite":
serializer, err = NewPrometheusRemoteWriteSerializer(config), nil
case "msgpack":
serializer, err = NewMsgpackSerializer(), nil
default:
err = fmt.Errorf("invalid data format: %s", config.DataFormat)
}
return serializer, err
}
func NewCSVSerializer(config *Config) (Serializer, error) {
return csv.NewSerializer(config.TimestampFormat, config.CSVSeparator, config.CSVHeader, config.CSVPrefix)
}
func NewPrometheusRemoteWriteSerializer(config *Config) Serializer {
sortMetrics := prometheusremotewrite.NoSortMetrics
if config.PrometheusExportTimestamp {
sortMetrics = prometheusremotewrite.SortMetrics
}
stringAsLabels := prometheusremotewrite.DiscardStrings
if config.PrometheusStringAsLabel {
stringAsLabels = prometheusremotewrite.StringAsLabel
}
return prometheusremotewrite.NewSerializer(prometheusremotewrite.FormatConfig{
MetricSortOrder: sortMetrics,
StringHandling: stringAsLabels,
})
}
func NewPrometheusSerializer(config *Config) Serializer {
exportTimestamp := prometheus.NoExportTimestamp
if config.PrometheusExportTimestamp {
exportTimestamp = prometheus.ExportTimestamp
}
sortMetrics := prometheus.NoSortMetrics
if config.PrometheusExportTimestamp {
sortMetrics = prometheus.SortMetrics
}
stringAsLabels := prometheus.DiscardStrings
if config.PrometheusStringAsLabel {
stringAsLabels = prometheus.StringAsLabel
}
return prometheus.NewSerializer(prometheus.FormatConfig{
TimestampExport: exportTimestamp,
MetricSortOrder: sortMetrics,
StringHandling: stringAsLabels,
CompactEncoding: config.PrometheusCompactEncoding,
})
}
func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []string, disablePrefixConversions bool) Serializer {
return wavefront.NewSerializer(prefix, useStrict, sourceOverride, disablePrefixConversions)
}
func NewJSONSerializer(config *Config) (Serializer, error) {
return json.NewSerializer(json.FormatConfig{
TimestampUnits: config.TimestampUnits,
TimestampFormat: config.TimestampFormat,
Transformation: config.Transformation,
NestedFieldsInclude: config.JSONNestedFieldInclude,
NestedFieldsExclude: config.JSONNestedFieldExclude,
})
}
func NewCarbon2Serializer(carbon2format string, carbon2SanitizeReplaceChar string) (Serializer, error) {
return carbon2.NewSerializer(carbon2format, carbon2SanitizeReplaceChar)
}
func NewSplunkmetricSerializer(splunkmetricHecRouting bool, splunkmetricMultimetric bool, splunkmetricOmitEventTag bool) Serializer {
return splunkmetric.NewSerializer(splunkmetricHecRouting, splunkmetricMultimetric, splunkmetricOmitEventTag)
}
func NewNowSerializer() (Serializer, error) {
return nowmetric.NewSerializer()
}
func NewInfluxSerializerConfig(config *Config) Serializer {
var sort influx.FieldSortOrder
if config.InfluxSortFields {
sort = influx.SortFields
}
var typeSupport influx.FieldTypeSupport
if config.InfluxUintSupport {
typeSupport = typeSupport + influx.UintSupport
}
s := influx.NewSerializer()
s.SetMaxLineBytes(config.InfluxMaxLineBytes)
s.SetFieldSortOrder(sort)
s.SetFieldTypeSupport(typeSupport)
return s
}
func NewInfluxSerializer() Serializer {
return influx.NewSerializer()
}
//nolint:revive //argument-limit conditionally more arguments allowed
func NewGraphiteSerializer(
prefix,
template string,
strictRegex string,
tagSupport bool,
tagSanitizeMode string,
separator string,
templates []string,
) (Serializer, error) {
graphiteTemplates, defaultTemplate, err := graphite.InitGraphiteTemplates(templates)
if err != nil {
return nil, err
}
if defaultTemplate != "" {
template = defaultTemplate
}
if tagSanitizeMode == "" {
tagSanitizeMode = "strict"
}
if separator == "" {
separator = "."
}
strictAllowedChars := regexp.MustCompile(`[^a-zA-Z0-9-:._=\p{L}]`)
if strictRegex != "" {
strictAllowedChars, err = regexp.Compile(strictRegex)
if err != nil {
return nil, fmt.Errorf("invalid regex provided %q: %w", strictRegex, err)
}
}
return &graphite.GraphiteSerializer{
Prefix: prefix,
Template: template,
StrictAllowedChars: strictAllowedChars,
TagSupport: tagSupport,
TagSanitizeMode: tagSanitizeMode,
Separator: separator,
Templates: graphiteTemplates,
}, nil
}
func NewMsgpackSerializer() Serializer {
return msgpack.NewSerializer()
}