Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LogQL: Labels and Metrics Extraction #2769

Merged
merged 49 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
03279b1
Adds logfmt, regexp and json logql parser
cyriltovena Sep 15, 2020
31c2f3f
hook the ast with parsers.
cyriltovena Sep 16, 2020
eaf72bd
hook parser with memchunk.
cyriltovena Sep 17, 2020
7d1dc7b
hook parser with the storage.
cyriltovena Sep 17, 2020
484afc1
hook parser with ingesters
cyriltovena Sep 17, 2020
0121a3c
fixes all tests
cyriltovena Sep 17, 2020
62f2829
Refactor to pipeline and implement ast parsing.
cyriltovena Sep 21, 2020
89c489c
Fixes the lexer for duration and range
cyriltovena Sep 22, 2020
4238173
Fixes all tests and add some for label filters
cyriltovena Sep 23, 2020
88ad104
Add label and line format.
cyriltovena Sep 23, 2020
c7791a4
Add tests for fmt label and line with validations.
cyriltovena Sep 24, 2020
0651e25
Polishing parsers and add some more test cases
cyriltovena Sep 25, 2020
4c0570d
Finish the unwrap parser, still need to add more tests
cyriltovena Sep 29, 2020
01e93c0
Indent this hell.
cyriltovena Sep 29, 2020
e455c88
Moar tests and it works.
cyriltovena Sep 29, 2020
8bc18e5
Add more tests which lead me to find a bug in the lexer
cyriltovena Sep 30, 2020
08d2cf7
Add more tests and fix all engine tests
cyriltovena Sep 30, 2020
b801417
Fixes match stage in promtail pipelines.
cyriltovena Sep 30, 2020
850b003
Hook Pipeline into ingester, tailer and storage.
cyriltovena Oct 1, 2020
31c26c0
Correctly setup sharding for logqlv2
cyriltovena Oct 1, 2020
b5e11d0
Fixes precedences issue with label filters and add moar tests :v:
cyriltovena Oct 2, 2020
0fd6018
Adds quantile_over_time, grouping for non associate range aggregation…
cyriltovena Oct 2, 2020
2ca6677
Extract with grouping
cyriltovena Oct 3, 2020
4effb67
Adds parsing duration on unwrap
cyriltovena Oct 5, 2020
832a977
Improve the lexer to support more common identifier as functions.
cyriltovena Oct 6, 2020
6563d6e
Fixes the frontend logs to include org_id.
cyriltovena Oct 6, 2020
92f7c39
Merge branch 'fix-orgid-frontend' into logql-parser
cyriltovena Oct 6, 2020
5578dbb
Support byte sizes in label filters.
jeschkies Oct 9, 2020
13132ad
Wip on error handling.
cyriltovena Oct 12, 2020
db07446
Fixes json parser with prometheus label name rules.
cyriltovena Oct 12, 2020
78973cf
fixup! Support byte sizes in label filters.
jeschkies Oct 12, 2020
bbacba7
Merge remote-tracking branch 'cyril/logql-parser' into karsten/bytes-…
jeschkies Oct 12, 2020
25dd730
Merge pull request #5 from jeschkies/karsten/bytes-filter
cyriltovena Oct 12, 2020
c054a5d
Wip error handling, commit before big refactoring.
cyriltovena Oct 13, 2020
e7d8234
Merge branch 'logql-parser' of github.com:cyriltovena/loki into logql…
cyriltovena Oct 13, 2020
5ab8b5c
Refactoring in progress.
cyriltovena Oct 13, 2020
5272d91
Work in progress.
cyriltovena Oct 13, 2020
1aa1609
Got something that builds and throw __error__ labels properly now.
cyriltovena Oct 14, 2020
1af9c14
Add error handling + fixes groupins and post filtering.
cyriltovena Oct 14, 2020
960ef5a
400 on pipeline errors.
cyriltovena Oct 14, 2020
de83465
Fixes a races in the log pipeline.
cyriltovena Oct 15, 2020
87c4f00
Unsure the key is parsable and valid.
cyriltovena Oct 15, 2020
50315b1
Cleanup and code documentation.
cyriltovena Oct 15, 2020
54dd6c2
Lint.
cyriltovena Oct 15, 2020
a50490f
Lint.
cyriltovena Oct 15, 2020
cbbc37c
Merge remote-tracking branch 'upstream/master' into logql-parser
cyriltovena Oct 15, 2020
801b721
Fixes frontend handler.
cyriltovena Oct 15, 2020
1aee415
Fixes old test.
cyriltovena Oct 15, 2020
1ea917f
Fix go1.15 local failing test.
cyriltovena Oct 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add more tests and fix all engine tests
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena committed Oct 1, 2020
commit 08d2cf7811c2c990dce50cac4ba45697fb11732d
3 changes: 2 additions & 1 deletion pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math"
"strings"

// "math"
"testing"
Expand Down Expand Up @@ -1817,7 +1818,7 @@ func paramsID(p interface{}) string {
if err != nil {
panic(err)
}
return string(b)
return strings.ReplaceAll(string(b), " ", "")
}

type logData struct {
Expand Down
38 changes: 38 additions & 0 deletions pkg/logql/series_extractor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package logql

import (
"reflect"
"testing"

"github.com/prometheus/prometheus/pkg/labels"
)

func Test_labelSampleExtractor_Extract(t *testing.T) {
tests := []struct {
name string
ex *labelSampleExtractor
in labels.Labels
want float64
wantLbs labels.Labels
}{
{
"convert float",
newLabelSampleExtractor("foo"),
labels.Labels{labels.Label{Name: "foo", Value: "15.0"}},
15,
labels.Labels{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

outval, outlbs := tt.ex.Extract([]byte(""), tt.in)
if outval != tt.want {
t.Errorf("labelSampleExtractor.Extract() val = %v, want %v", outval, tt.want)
}
if !reflect.DeepEqual(outlbs, tt.wantLbs) {
t.Errorf("labelSampleExtractor.Extract() lbs = %v, want %v", outlbs, tt.wantLbs)
}
})
}
}
4 changes: 2 additions & 2 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (m ShardMapper) Map(expr Expr, r *shardRecorder) (Expr, error) {
case *literalExpr:
return e, nil
//todo(cyriltovena) enable sharding on logqlv2
// case *matchersExpr, *filterExpr:
// return m.mapLogSelectorExpr(e.(LogSelectorExpr), r), nil
case *matchersExpr, *pipelineExpr:
return m.mapLogSelectorExpr(e.(LogSelectorExpr), r), nil
case *vectorAggregationExpr:
return m.mapVectorAggregationExpr(e, r)
case *rangeAggregationExpr:
Expand Down
78 changes: 38 additions & 40 deletions pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logql

import (
"strings"
"testing"
"time"

Expand Down Expand Up @@ -130,6 +131,10 @@ func TestMappingStrings(t *testing.T) {
in: `{foo="bar"}`,
out: `downstream<{foo="bar"}, shard=0_of_2> ++ downstream<{foo="bar"}, shard=1_of_2>`,
},
{
in: `{foo="bar"} |= "foo" |~ "bar" | json | latency >= 10s or foo<5 and bar="t" | line_format "b{{.blip}}"`,
out: `downstream<{foo="bar"} |="foo" |~"bar" | json | latency>=10s or foo<5,bar="t"| line_format "b{{.blip}}",shard=0_of_2>++downstream<{foo="bar"} |="foo" |~"bar" | json | latency>=10s or foo<5, bar="t" | line_format "b{{.blip}}",shard=1_of_2>`,
},
{
in: `sum(rate({foo="bar"}[1m]))`,
out: `sum(downstream<sum(rate({foo="bar"}[1m])), shard=0_of_2> ++ downstream<sum(rate({foo="bar"}[1m])), shard=1_of_2>)`,
Expand Down Expand Up @@ -162,7 +167,7 @@ func TestMappingStrings(t *testing.T) {
mapped, err := m.Map(ast, nilMetrics.shardRecorder())
require.Nil(t, err)

require.Equal(t, tc.out, mapped.String())
require.Equal(t, strings.ReplaceAll(tc.out, " ", ""), strings.ReplaceAll(mapped.String(), " ", ""))

})
}
Expand Down Expand Up @@ -207,45 +212,38 @@ func TestMapping(t *testing.T) {
},
},
},
// todo(cyriltovena) fix
// {
// in: `{foo="bar"} |= "error"`,
// expr: &ConcatLogSelectorExpr{
// DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{
// shard: &astmapper.ShardAnnotation{
// Shard: 0,
// Of: 2,
// },
// LogSelectorExpr: &filterExpr{
// match: "error",
// ty: labels.MatchEqual,
// left: &matchersExpr{
// matchers: []*labels.Matcher{
// mustNewMatcher(labels.MatchEqual, "foo", "bar"),
// },
// },
// },
// },
// next: &ConcatLogSelectorExpr{
// DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{
// shard: &astmapper.ShardAnnotation{
// Shard: 1,
// Of: 2,
// },
// LogSelectorExpr: &filterExpr{
// match: "error",
// ty: labels.MatchEqual,
// left: &matchersExpr{
// matchers: []*labels.Matcher{
// mustNewMatcher(labels.MatchEqual, "foo", "bar"),
// },
// },
// },
// },
// next: nil,
// },
// },
// },
{
in: `{foo="bar"} |= "error"`,
expr: &ConcatLogSelectorExpr{
DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
LogSelectorExpr: newPipelineExpr(
newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}),
MultiPipelineExpr{
newLineFilterExpr(nil, labels.MatchEqual, "error"),
},
),
},
next: &ConcatLogSelectorExpr{
DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
LogSelectorExpr: newPipelineExpr(
newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}),
MultiPipelineExpr{
newLineFilterExpr(nil, labels.MatchEqual, "error"),
},
),
},
next: nil,
},
},
},
{
in: `rate({foo="bar"}[5m])`,
expr: &ConcatSampleExpr{
Expand Down
53 changes: 49 additions & 4 deletions pkg/logql/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,58 @@ outer:
}

func processStream(in []logproto.Stream, pipeline Pipeline) []logproto.Stream {
// todo(cyriltovena)
return in
resByStream := map[string]*logproto.Stream{}

for _, stream := range in {
for _, e := range stream.Entries {
if l, out, ok := pipeline.Process([]byte(e.Line), mustParseLabels(stream.Labels)); ok {
var s *logproto.Stream
var found bool
s, found = resByStream[out.String()]
if !found {
s = &logproto.Stream{Labels: out.String()}
resByStream[out.String()] = s
}
s.Entries = append(s.Entries, logproto.Entry{
Timestamp: e.Timestamp,
Line: string(l),
})
}
}
}
streams := []logproto.Stream{}
for _, stream := range resByStream {
streams = append(streams, *stream)
}
return streams
}

func processSeries(in []logproto.Stream, pipeline Pipeline, ex SampleExtractor) []logproto.Series {
// todo(cyriltovena)
return nil
resBySeries := map[string]*logproto.Series{}

for _, stream := range in {
for _, e := range stream.Entries {
if l, out, ok := pipeline.Process([]byte(e.Line), mustParseLabels(stream.Labels)); ok {
f, lbs := ex.Extract(l, out)
var s *logproto.Series
var found bool
s, found = resBySeries[lbs.String()]
if !found {
s = &logproto.Series{Labels: lbs.String()}
resBySeries[lbs.String()] = s
}
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: e.Timestamp.UnixNano(),
Value: f,
})
}
}
}
series := []logproto.Series{}
for _, s := range resBySeries {
series = append(series, *s)
}
return series
}

func (q MockQuerier) SelectSamples(ctx context.Context, req SelectSampleParams) (iter.SampleIterator, error) {
Expand Down