Skip to content

Commit

Permalink
feat: Add json_timestamp_layout option (#8229)
Browse files Browse the repository at this point in the history
  • Loading branch information
HeikoSchlittermann authored Sep 21, 2021
1 parent 8133fd8 commit b9aa983
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 27 deletions.
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,7 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error)
c.getFieldString(tbl, "graphite_separator", &sc.GraphiteSeparator)

c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits)
c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat)

c.getFieldBool(tbl, "splunkmetric_hec_routing", &sc.HecRouting)
c.getFieldBool(tbl, "splunkmetric_multimetric", &sc.SplunkmetricMultiMetric)
Expand Down Expand Up @@ -1569,7 +1570,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"grok_custom_pattern_files", "grok_custom_patterns", "grok_named_patterns", "grok_patterns",
"grok_timezone", "grok_unique_timestamp", "influx_max_line_bytes", "influx_sort_fields",
"influx_uint_support", "interval", "json_name_key", "json_query", "json_strict",
"json_string_fields", "json_time_format", "json_time_key", "json_timestamp_units", "json_timezone", "json_v2",
"json_string_fields", "json_time_format", "json_time_key", "json_timestamp_format", "json_timestamp_units", "json_timezone", "json_v2",
"metric_batch_size", "metric_buffer_limit", "name_override", "name_prefix",
"name_suffix", "namedrop", "namepass", "order", "pass", "period", "precision",
"prefix", "prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",
Expand Down
2 changes: 1 addition & 1 deletion plugins/outputs/azure_data_explorer/azure_data_explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (adx *AzureDataExplorer) Init() error {
return errors.New("Metrics grouping type is not valid")
}

serializer, err := json.NewSerializer(time.Second)
serializer, err := json.NewSerializer(time.Second, "") // FIXME: get the json.TimestampFormat from the config file
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestWrite(t *testing.T) {

for _, tC := range testCases {
t.Run(tC.name, func(t *testing.T) {
serializer, err := telegrafJson.NewSerializer(time.Second)
serializer, err := telegrafJson.NewSerializer(time.Second, "")
require.NoError(t, err)

plugin := AzureDataExplorer{
Expand Down
7 changes: 7 additions & 0 deletions plugins/serializers/json/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ The `json` output data format converts metrics into JSON documents.
## such as "1ns", "1us", "1ms", "10ms", "1s". Durations are truncated to
## the power of 10 less than the specified units.
json_timestamp_units = "1s"

## The default timestamp format is Unix epoch time, subject to the
# resolution configured in json_timestamp_units.
# Other timestamp layout can be configured using the Go language time
# layout specification from https://golang.org/pkg/time/#Time.Format
# e.g.: json_timestamp_format = "2006-01-02T15:04:05Z07:00"
#json_timestamp_format = ""
```

### Examples:
Expand Down
24 changes: 15 additions & 9 deletions plugins/serializers/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
"github.com/influxdata/telegraf"
)

type serializer struct {
TimestampUnits time.Duration
type Serializer struct {
TimestampUnits time.Duration
TimestampFormat string
}

func NewSerializer(timestampUnits time.Duration) (*serializer, error) {
s := &serializer{
TimestampUnits: truncateDuration(timestampUnits),
func NewSerializer(timestampUnits time.Duration, timestampformat string) (*Serializer, error) {
s := &Serializer{
TimestampUnits: truncateDuration(timestampUnits),
TimestampFormat: timestampformat,
}
return s, nil
}

func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
m := s.createObject(metric)
serialized, err := json.Marshal(m)
if err != nil {
Expand All @@ -30,7 +32,7 @@ func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return serialized, nil
}

func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
objects := make([]interface{}, 0, len(metrics))
for _, metric := range metrics {
m := s.createObject(metric)
Expand All @@ -48,7 +50,7 @@ func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
return serialized, nil
}

func (s *serializer) createObject(metric telegraf.Metric) map[string]interface{} {
func (s *Serializer) createObject(metric telegraf.Metric) map[string]interface{} {
m := make(map[string]interface{}, 4)

tags := make(map[string]string, len(metric.TagList()))
Expand All @@ -71,7 +73,11 @@ func (s *serializer) createObject(metric telegraf.Metric) map[string]interface{}
m["fields"] = fields

m["name"] = metric.Name()
m["timestamp"] = metric.Time().UnixNano() / int64(s.TimestampUnits)
if s.TimestampFormat == "" {
m["timestamp"] = metric.Time().UnixNano() / int64(s.TimestampUnits)
} else {
m["timestamp"] = metric.Time().UTC().Format(s.TimestampFormat)
}
return m
}

Expand Down
30 changes: 18 additions & 12 deletions plugins/serializers/json/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestSerializeMetricFloat(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)

s, _ := NewSerializer(0)
s, _ := NewSerializer(0, "")
var buf []byte
buf, err := s.Serialize(m)
assert.NoError(t, err)
Expand All @@ -40,9 +40,10 @@ func TestSerializeMetricFloat(t *testing.T) {

func TestSerialize_TimestampUnits(t *testing.T) {
tests := []struct {
name string
timestampUnits time.Duration
expected string
name string
timestampUnits time.Duration
timestampFormat string
expected string
}{
{
name: "default of 1s",
Expand Down Expand Up @@ -74,6 +75,11 @@ func TestSerialize_TimestampUnits(t *testing.T) {
timestampUnits: 65 * time.Millisecond,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":152547879512}`,
},
{
name: "timestamp format",
timestampFormat: "2006-01-02T15:04:05Z07:00",
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":"2018-05-05T00:06:35Z"}`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -85,7 +91,7 @@ func TestSerialize_TimestampUnits(t *testing.T) {
},
time.Unix(1525478795, 123456789),
)
s, _ := NewSerializer(tt.timestampUnits)
s, _ := NewSerializer(tt.timestampUnits, tt.timestampFormat)
actual, err := s.Serialize(m)
require.NoError(t, err)
require.Equal(t, tt.expected+"\n", string(actual))
Expand All @@ -103,7 +109,7 @@ func TestSerializeMetricInt(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)

s, _ := NewSerializer(0)
s, _ := NewSerializer(0, "")
var buf []byte
buf, err := s.Serialize(m)
assert.NoError(t, err)
Expand All @@ -122,7 +128,7 @@ func TestSerializeMetricString(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)

s, _ := NewSerializer(0)
s, _ := NewSerializer(0, "")
var buf []byte
buf, err := s.Serialize(m)
assert.NoError(t, err)
Expand All @@ -142,7 +148,7 @@ func TestSerializeMultiFields(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)

s, _ := NewSerializer(0)
s, _ := NewSerializer(0, "")
var buf []byte
buf, err := s.Serialize(m)
assert.NoError(t, err)
Expand All @@ -161,7 +167,7 @@ func TestSerializeMetricWithEscapes(t *testing.T) {
}
m := metric.New("My CPU", tags, fields, now)

s, _ := NewSerializer(0)
s, _ := NewSerializer(0, "")
buf, err := s.Serialize(m)
assert.NoError(t, err)

Expand All @@ -180,7 +186,7 @@ func TestSerializeBatch(t *testing.T) {
)

metrics := []telegraf.Metric{m, m}
s, _ := NewSerializer(0)
s, _ := NewSerializer(0, "")
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
require.Equal(t, []byte(`{"metrics":[{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0},{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0}]}`), buf)
Expand All @@ -199,7 +205,7 @@ func TestSerializeBatchSkipInf(t *testing.T) {
),
}

s, err := NewSerializer(0)
s, err := NewSerializer(0, "")
require.NoError(t, err)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
Expand All @@ -218,7 +224,7 @@ func TestSerializeBatchSkipInfAllFields(t *testing.T) {
),
}

s, err := NewSerializer(0)
s, err := NewSerializer(0, "")
require.NoError(t, err)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
Expand Down
9 changes: 6 additions & 3 deletions plugins/serializers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type Config struct {
// Timestamp units to use for JSON formatted output
TimestampUnits time.Duration `toml:"timestamp_units"`

// Timestamp format to use for JSON formatted output
TimestampFormat string `toml:"timestamp_format"`

// Include HEC routing fields for splunkmetric output
HecRouting bool `toml:"hec_routing"`

Expand Down Expand Up @@ -123,7 +126,7 @@ func NewSerializer(config *Config) (Serializer, error) {
case "graphite":
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.GraphiteTagSanitizeMode, config.GraphiteSeparator, config.Templates)
case "json":
serializer, err = NewJSONSerializer(config.TimestampUnits)
serializer, err = NewJSONSerializer(config.TimestampUnits, config.TimestampFormat)
case "splunkmetric":
serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric)
case "nowmetric":
Expand Down Expand Up @@ -188,8 +191,8 @@ func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []stri
return wavefront.NewSerializer(prefix, useStrict, sourceOverride)
}

func NewJSONSerializer(timestampUnits time.Duration) (Serializer, error) {
return json.NewSerializer(timestampUnits)
func NewJSONSerializer(timestampUnits time.Duration, timestampFormat string) (Serializer, error) {
return json.NewSerializer(timestampUnits, timestampFormat)
}

func NewCarbon2Serializer(carbon2format string, carbon2SanitizeReplaceChar string) (Serializer, error) {
Expand Down

0 comments on commit b9aa983

Please sign in to comment.