Skip to content

Commit

Permalink
fix: sort logs by timestamp before writing to Loki (#9571)
Browse files Browse the repository at this point in the history
(cherry picked from commit e6abb46)
  • Loading branch information
jhychan authored and reimda committed Aug 18, 2021
1 parent 638c7ae commit 5e060d4
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
2 changes: 2 additions & 0 deletions plugins/outputs/loki/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
This plugin sends logs to Loki, using tags as labels,
log line will content all fields in `key="value"` format which is easily parsable with `logfmt` parser in Loki.

Logs within each stream are sorted by timestamp before being sent to Loki.

### Configuration:

```toml
Expand Down
5 changes: 5 additions & 0 deletions plugins/outputs/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -137,6 +138,10 @@ func (l *Loki) Close() error {
func (l *Loki) Write(metrics []telegraf.Metric) error {
s := Streams{}

sort.SliceStable(metrics, func(i, j int) bool {
return metrics[i].Time().Before(metrics[j].Time())
})

for _, m := range metrics {
tags := m.TagList()
var line string
Expand Down
71 changes: 71 additions & 0 deletions plugins/outputs/loki/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,33 @@ func getMetric() telegraf.Metric {
)
}

func getOutOfOrderMetrics() []telegraf.Metric {
return []telegraf.Metric{
testutil.MustMetric(
"log",
map[string]string{
"key1": "value1",
},
map[string]interface{}{
"line": "newer log",
"field": 3.14,
},
time.Unix(1230, 0),
),
testutil.MustMetric(
"log",
map[string]string{
"key1": "value1",
},
map[string]interface{}{
"line": "older log",
"field": 3.14,
},
time.Unix(456, 0),
),
}
}

func TestStatusCode(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
Expand Down Expand Up @@ -354,3 +381,47 @@ func TestDefaultUserAgent(t *testing.T) {
require.NoError(t, err)
})
}

func TestMetricSorting(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

t.Run("out of order metrics", func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body := r.Body
var err error

payload, err := ioutil.ReadAll(body)
require.NoError(t, err)

var s Request
err = json.Unmarshal(payload, &s)
require.NoError(t, err)
require.Len(t, s.Streams, 1)
require.Len(t, s.Streams[0].Logs, 2)
require.Len(t, s.Streams[0].Logs[0], 2)
require.Equal(t, map[string]string{"key1": "value1"}, s.Streams[0].Labels)
require.Equal(t, "456000000000", s.Streams[0].Logs[0][0])
require.Contains(t, s.Streams[0].Logs[0][1], "line=\"older log\"")
require.Contains(t, s.Streams[0].Logs[0][1], "field=\"3.14\"")
require.Equal(t, "1230000000000", s.Streams[0].Logs[1][0])
require.Contains(t, s.Streams[0].Logs[1][1], "line=\"newer log\"")
require.Contains(t, s.Streams[0].Logs[1][1], "field=\"3.14\"")

w.WriteHeader(http.StatusNoContent)
})

client := &Loki{
Domain: u.String(),
}

err = client.Connect()
require.NoError(t, err)

err = client.Write(getOutOfOrderMetrics())
require.NoError(t, err)
})
}

0 comments on commit 5e060d4

Please sign in to comment.