diff --git a/plugins/outputs/loki/README.md b/plugins/outputs/loki/README.md index 9c48f95bae805..681d3b207c1af 100644 --- a/plugins/outputs/loki/README.md +++ b/plugins/outputs/loki/README.md @@ -3,6 +3,8 @@ This plugin sends logs to Loki, using tags as labels, log line will content all fields in `key="value"` format which is easily parsable with `logfmt` parser in Loki. +Logs within each stream are sorted by timestamp before being sent to Loki. + ### Configuration: ```toml diff --git a/plugins/outputs/loki/loki.go b/plugins/outputs/loki/loki.go index 21cc66776d682..2f920ec829e3b 100644 --- a/plugins/outputs/loki/loki.go +++ b/plugins/outputs/loki/loki.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "sort" "strings" "time" @@ -137,6 +138,10 @@ func (l *Loki) Close() error { func (l *Loki) Write(metrics []telegraf.Metric) error { s := Streams{} + sort.SliceStable(metrics, func(i, j int) bool { + return metrics[i].Time().Before(metrics[j].Time()) + }) + for _, m := range metrics { tags := m.TagList() var line string diff --git a/plugins/outputs/loki/loki_test.go b/plugins/outputs/loki/loki_test.go index 1b8b61e34e48e..efe31728218d7 100644 --- a/plugins/outputs/loki/loki_test.go +++ b/plugins/outputs/loki/loki_test.go @@ -31,6 +31,33 @@ func getMetric() telegraf.Metric { ) } +func getOutOfOrderMetrics() []telegraf.Metric { + return []telegraf.Metric{ + testutil.MustMetric( + "log", + map[string]string{ + "key1": "value1", + }, + map[string]interface{}{ + "line": "newer log", + "field": 3.14, + }, + time.Unix(1230, 0), + ), + testutil.MustMetric( + "log", + map[string]string{ + "key1": "value1", + }, + map[string]interface{}{ + "line": "older log", + "field": 3.14, + }, + time.Unix(456, 0), + ), + } +} + func TestStatusCode(t *testing.T) { ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() @@ -354,3 +381,47 @@ func TestDefaultUserAgent(t *testing.T) { require.NoError(t, err) }) } + +func TestMetricSorting(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + t.Run("out of order metrics", func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body := r.Body + var err error + + payload, err := ioutil.ReadAll(body) + require.NoError(t, err) + + var s Request + err = json.Unmarshal(payload, &s) + require.NoError(t, err) + require.Len(t, s.Streams, 1) + require.Len(t, s.Streams[0].Logs, 2) + require.Len(t, s.Streams[0].Logs[0], 2) + require.Equal(t, map[string]string{"key1": "value1"}, s.Streams[0].Labels) + require.Equal(t, "456000000000", s.Streams[0].Logs[0][0]) + require.Contains(t, s.Streams[0].Logs[0][1], "line=\"older log\"") + require.Contains(t, s.Streams[0].Logs[0][1], "field=\"3.14\"") + require.Equal(t, "1230000000000", s.Streams[0].Logs[1][0]) + require.Contains(t, s.Streams[0].Logs[1][1], "line=\"newer log\"") + require.Contains(t, s.Streams[0].Logs[1][1], "field=\"3.14\"") + + w.WriteHeader(http.StatusNoContent) + }) + + client := &Loki{ + Domain: u.String(), + } + + err = client.Connect() + require.NoError(t, err) + + err = client.Write(getOutOfOrderMetrics()) + require.NoError(t, err) + }) +}