diff --git a/config/config.go b/config/config.go index 56beed8ee4910..b6eed9446162f 100644 --- a/config/config.go +++ b/config/config.go @@ -1504,6 +1504,7 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error) c.getFieldString(tbl, "graphite_separator", &sc.GraphiteSeparator) c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits) + c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat) c.getFieldBool(tbl, "splunkmetric_hec_routing", &sc.HecRouting) c.getFieldBool(tbl, "splunkmetric_multimetric", &sc.SplunkmetricMultiMetric) @@ -1569,7 +1570,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { "grok_custom_pattern_files", "grok_custom_patterns", "grok_named_patterns", "grok_patterns", "grok_timezone", "grok_unique_timestamp", "influx_max_line_bytes", "influx_sort_fields", "influx_uint_support", "interval", "json_name_key", "json_query", "json_strict", - "json_string_fields", "json_time_format", "json_time_key", "json_timestamp_units", "json_timezone", "json_v2", + "json_string_fields", "json_time_format", "json_time_key", "json_timestamp_format", "json_timestamp_units", "json_timezone", "json_v2", "metric_batch_size", "metric_buffer_limit", "name_override", "name_prefix", "name_suffix", "namedrop", "namepass", "order", "pass", "period", "precision", "prefix", "prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label", diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer.go b/plugins/outputs/azure_data_explorer/azure_data_explorer.go index 6d411fd05c3b9..b4c2054d3c22e 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer.go @@ -230,7 +230,7 @@ func (adx *AzureDataExplorer) Init() error { return errors.New("Metrics grouping type is not valid") } - serializer, err := json.NewSerializer(time.Second) + serializer, err := json.NewSerializer(time.Second, "") // FIXME: get the json.TimestampFormat from the config file if err != nil { return err } diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go index f85d074cb1f6f..b8d30d66ce28b 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go @@ -105,7 +105,7 @@ func TestWrite(t *testing.T) { for _, tC := range testCases { t.Run(tC.name, func(t *testing.T) { - serializer, err := telegrafJson.NewSerializer(time.Second) + serializer, err := telegrafJson.NewSerializer(time.Second, "") require.NoError(t, err) plugin := AzureDataExplorer{ diff --git a/plugins/serializers/json/README.md b/plugins/serializers/json/README.md index 08bb9d4f7c904..b33875578272a 100644 --- a/plugins/serializers/json/README.md +++ b/plugins/serializers/json/README.md @@ -19,6 +19,13 @@ The `json` output data format converts metrics into JSON documents. ## such as "1ns", "1us", "1ms", "10ms", "1s". Durations are truncated to ## the power of 10 less than the specified units. json_timestamp_units = "1s" + + ## The default timestamp format is Unix epoch time, subject to the + # resolution configured in json_timestamp_units. + # Other timestamp layout can be configured using the Go language time + # layout specification from https://golang.org/pkg/time/#Time.Format + # e.g.: json_timestamp_format = "2006-01-02T15:04:05Z07:00" + #json_timestamp_format = "" ``` ### Examples: diff --git a/plugins/serializers/json/json.go b/plugins/serializers/json/json.go index e2d7af3305117..6db2a43ee231a 100644 --- a/plugins/serializers/json/json.go +++ b/plugins/serializers/json/json.go @@ -8,18 +8,20 @@ import ( "github.com/influxdata/telegraf" ) -type serializer struct { - TimestampUnits time.Duration +type Serializer struct { + TimestampUnits time.Duration + TimestampFormat string } -func NewSerializer(timestampUnits time.Duration) (*serializer, error) { - s := &serializer{ - TimestampUnits: truncateDuration(timestampUnits), +func NewSerializer(timestampUnits time.Duration, timestampformat string) (*Serializer, error) { + s := &Serializer{ + TimestampUnits: truncateDuration(timestampUnits), + TimestampFormat: timestampformat, } return s, nil } -func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) { +func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { m := s.createObject(metric) serialized, err := json.Marshal(m) if err != nil { @@ -30,7 +32,7 @@ func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) { return serialized, nil } -func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { +func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { objects := make([]interface{}, 0, len(metrics)) for _, metric := range metrics { m := s.createObject(metric) @@ -48,7 +50,7 @@ func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { return serialized, nil } -func (s *serializer) createObject(metric telegraf.Metric) map[string]interface{} { +func (s *Serializer) createObject(metric telegraf.Metric) map[string]interface{} { m := make(map[string]interface{}, 4) tags := make(map[string]string, len(metric.TagList())) @@ -71,7 +73,11 @@ func (s *serializer) createObject(metric telegraf.Metric) map[string]interface{} m["fields"] = fields m["name"] = metric.Name() - m["timestamp"] = metric.Time().UnixNano() / int64(s.TimestampUnits) + if s.TimestampFormat == "" { + m["timestamp"] = metric.Time().UnixNano() / int64(s.TimestampUnits) + } else { + m["timestamp"] = metric.Time().UTC().Format(s.TimestampFormat) + } return m } diff --git a/plugins/serializers/json/json_test.go b/plugins/serializers/json/json_test.go index 74d7f94166621..be939243904eb 100644 --- a/plugins/serializers/json/json_test.go +++ b/plugins/serializers/json/json_test.go @@ -30,7 +30,7 @@ func TestSerializeMetricFloat(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, _ := NewSerializer(0) + s, _ := NewSerializer(0, "") var buf []byte buf, err := s.Serialize(m) assert.NoError(t, err) @@ -40,9 +40,10 @@ func TestSerializeMetricFloat(t *testing.T) { func TestSerialize_TimestampUnits(t *testing.T) { tests := []struct { - name string - timestampUnits time.Duration - expected string + name string + timestampUnits time.Duration + timestampFormat string + expected string }{ { name: "default of 1s", @@ -74,6 +75,11 @@ func TestSerialize_TimestampUnits(t *testing.T) { timestampUnits: 65 * time.Millisecond, expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":152547879512}`, }, + { + name: "timestamp format", + timestampFormat: "2006-01-02T15:04:05Z07:00", + expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":"2018-05-05T00:06:35Z"}`, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -85,7 +91,7 @@ func TestSerialize_TimestampUnits(t *testing.T) { }, time.Unix(1525478795, 123456789), ) - s, _ := NewSerializer(tt.timestampUnits) + s, _ := NewSerializer(tt.timestampUnits, tt.timestampFormat) actual, err := s.Serialize(m) require.NoError(t, err) require.Equal(t, tt.expected+"\n", string(actual)) @@ -103,7 +109,7 @@ func TestSerializeMetricInt(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, _ := NewSerializer(0) + s, _ := NewSerializer(0, "") var buf []byte buf, err := s.Serialize(m) assert.NoError(t, err) @@ -122,7 +128,7 @@ func TestSerializeMetricString(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, _ := NewSerializer(0) + s, _ := NewSerializer(0, "") var buf []byte buf, err := s.Serialize(m) assert.NoError(t, err) @@ -142,7 +148,7 @@ func TestSerializeMultiFields(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, _ := NewSerializer(0) + s, _ := NewSerializer(0, "") var buf []byte buf, err := s.Serialize(m) assert.NoError(t, err) @@ -161,7 +167,7 @@ func TestSerializeMetricWithEscapes(t *testing.T) { } m := metric.New("My CPU", tags, fields, now) - s, _ := NewSerializer(0) + s, _ := NewSerializer(0, "") buf, err := s.Serialize(m) assert.NoError(t, err) @@ -180,7 +186,7 @@ func TestSerializeBatch(t *testing.T) { ) metrics := []telegraf.Metric{m, m} - s, _ := NewSerializer(0) + s, _ := NewSerializer(0, "") buf, err := s.SerializeBatch(metrics) require.NoError(t, err) require.Equal(t, []byte(`{"metrics":[{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0},{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0}]}`), buf) @@ -199,7 +205,7 @@ func TestSerializeBatchSkipInf(t *testing.T) { ), } - s, err := NewSerializer(0) + s, err := NewSerializer(0, "") require.NoError(t, err) buf, err := s.SerializeBatch(metrics) require.NoError(t, err) @@ -218,7 +224,7 @@ func TestSerializeBatchSkipInfAllFields(t *testing.T) { ), } - s, err := NewSerializer(0) + s, err := NewSerializer(0, "") require.NoError(t, err) buf, err := s.SerializeBatch(metrics) require.NoError(t, err) diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index e67a9594dda73..b17364e66f0a6 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -88,6 +88,9 @@ type Config struct { // Timestamp units to use for JSON formatted output TimestampUnits time.Duration `toml:"timestamp_units"` + // Timestamp format to use for JSON formatted output + TimestampFormat string `toml:"timestamp_format"` + // Include HEC routing fields for splunkmetric output HecRouting bool `toml:"hec_routing"` @@ -123,7 +126,7 @@ func NewSerializer(config *Config) (Serializer, error) { case "graphite": serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.GraphiteTagSanitizeMode, config.GraphiteSeparator, config.Templates) case "json": - serializer, err = NewJSONSerializer(config.TimestampUnits) + serializer, err = NewJSONSerializer(config.TimestampUnits, config.TimestampFormat) case "splunkmetric": serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric) case "nowmetric": @@ -188,8 +191,8 @@ func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []stri return wavefront.NewSerializer(prefix, useStrict, sourceOverride) } -func NewJSONSerializer(timestampUnits time.Duration) (Serializer, error) { - return json.NewSerializer(timestampUnits) +func NewJSONSerializer(timestampUnits time.Duration, timestampFormat string) (Serializer, error) { + return json.NewSerializer(timestampUnits, timestampFormat) } func NewCarbon2Serializer(carbon2format string, carbon2SanitizeReplaceChar string) (Serializer, error) {