Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LogQL: Labels and Metrics Extraction #2769

Merged
merged 49 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
03279b1
Adds logfmt, regexp and json logql parser
cyriltovena Sep 15, 2020
31c2f3f
hook the ast with parsers.
cyriltovena Sep 16, 2020
eaf72bd
hook parser with memchunk.
cyriltovena Sep 17, 2020
7d1dc7b
hook parser with the storage.
cyriltovena Sep 17, 2020
484afc1
hook parser with ingesters
cyriltovena Sep 17, 2020
0121a3c
fixes all tests
cyriltovena Sep 17, 2020
62f2829
Refactor to pipeline and implement ast parsing.
cyriltovena Sep 21, 2020
89c489c
Fixes the lexer for duration and range
cyriltovena Sep 22, 2020
4238173
Fixes all tests and add some for label filters
cyriltovena Sep 23, 2020
88ad104
Add label and line format.
cyriltovena Sep 23, 2020
c7791a4
Add tests for fmt label and line with validations.
cyriltovena Sep 24, 2020
0651e25
Polishing parsers and add some more test cases
cyriltovena Sep 25, 2020
4c0570d
Finish the unwrap parser, still need to add more tests
cyriltovena Sep 29, 2020
01e93c0
Indent this hell.
cyriltovena Sep 29, 2020
e455c88
Moar tests and it works.
cyriltovena Sep 29, 2020
8bc18e5
Add more tests which lead me to find a bug in the lexer
cyriltovena Sep 30, 2020
08d2cf7
Add more tests and fix all engine tests
cyriltovena Sep 30, 2020
b801417
Fixes match stage in promtail pipelines.
cyriltovena Sep 30, 2020
850b003
Hook Pipeline into ingester, tailer and storage.
cyriltovena Oct 1, 2020
31c26c0
Correctly setup sharding for logqlv2
cyriltovena Oct 1, 2020
b5e11d0
Fixes precedences issue with label filters and add moar tests :v:
cyriltovena Oct 2, 2020
0fd6018
Adds quantile_over_time, grouping for non associate range aggregation…
cyriltovena Oct 2, 2020
2ca6677
Extract with grouping
cyriltovena Oct 3, 2020
4effb67
Adds parsing duration on unwrap
cyriltovena Oct 5, 2020
832a977
Improve the lexer to support more common identifier as functions.
cyriltovena Oct 6, 2020
6563d6e
Fixes the frontend logs to include org_id.
cyriltovena Oct 6, 2020
92f7c39
Merge branch 'fix-orgid-frontend' into logql-parser
cyriltovena Oct 6, 2020
5578dbb
Support byte sizes in label filters.
jeschkies Oct 9, 2020
13132ad
Wip on error handling.
cyriltovena Oct 12, 2020
db07446
Fixes json parser with prometheus label name rules.
cyriltovena Oct 12, 2020
78973cf
fixup! Support byte sizes in label filters.
jeschkies Oct 12, 2020
bbacba7
Merge remote-tracking branch 'cyril/logql-parser' into karsten/bytes-…
jeschkies Oct 12, 2020
25dd730
Merge pull request #5 from jeschkies/karsten/bytes-filter
cyriltovena Oct 12, 2020
c054a5d
Wip error handling, commit before big refactoring.
cyriltovena Oct 13, 2020
e7d8234
Merge branch 'logql-parser' of github.com:cyriltovena/loki into logql…
cyriltovena Oct 13, 2020
5ab8b5c
Refactoring in progress.
cyriltovena Oct 13, 2020
5272d91
Work in progress.
cyriltovena Oct 13, 2020
1aa1609
Got something that builds and throw __error__ labels properly now.
cyriltovena Oct 14, 2020
1af9c14
Add error handling + fixes groupins and post filtering.
cyriltovena Oct 14, 2020
960ef5a
400 on pipeline errors.
cyriltovena Oct 14, 2020
de83465
Fixes a races in the log pipeline.
cyriltovena Oct 15, 2020
87c4f00
Unsure the key is parsable and valid.
cyriltovena Oct 15, 2020
50315b1
Cleanup and code documentation.
cyriltovena Oct 15, 2020
54dd6c2
Lint.
cyriltovena Oct 15, 2020
a50490f
Lint.
cyriltovena Oct 15, 2020
cbbc37c
Merge remote-tracking branch 'upstream/master' into logql-parser
cyriltovena Oct 15, 2020
801b721
Fixes frontend handler.
cyriltovena Oct 15, 2020
1aee415
Fixes old test.
cyriltovena Oct 15, 2020
1ea917f
Fix go1.15 local failing test.
cyriltovena Oct 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor to pipeline and implement ast parsing.
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena committed Oct 1, 2020
commit 62f282999b86aa5611c9ce40f00b101a6250d263
223 changes: 175 additions & 48 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/labelfilter"
)

// Expr is the root expression which can be a SampleExpr or LogSelectorExpr
Expand Down Expand Up @@ -73,18 +74,93 @@ type Querier interface {

// LogSelectorExpr is a LogQL expression filtering and returning logs.
type LogSelectorExpr interface {
Filter() (LineFilter, error)
Matchers() []*labels.Matcher
Parser() (LabelParser, error)
PipelineExpr
Expr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expr seems redundant now that PipelineExpr is here.

}

type PipelineExpr interface {
Pipeline() (Pipeline, error)
Expr
}

type Pipeline interface {
Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool)
}

var NoopPipeline = &noopPipeline{}

type noopPipeline struct{}

func (noopPipeline) Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) {
return line, lbs, true
}

type PipelineFunc func(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool)

func (fn PipelineFunc) Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) {
return fn(line, lbs)
}

type MultiPipeline []Pipeline

func (m MultiPipeline) Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) {
var ok bool
for _, p := range m {
line, lbs, ok = p.Process(line, lbs)
if !ok {
return line, lbs, ok
}
}
return line, lbs, ok
}

type MultiPipelineExpr []PipelineExpr

func (m MultiPipelineExpr) Pipeline() (Pipeline, error) {
c := make(MultiPipeline, 0, len(m))
for _, e := range m {
p, err := e.Pipeline()
if err != nil {
return nil, err
}
c = append(c, p)
}
return c, nil
}

func (m MultiPipelineExpr) String() string {
var sb strings.Builder
for _, e := range m {
sb.WriteString(e.String())
}
return sb.String()
}

func (MultiPipelineExpr) logQLExpr() {}

func FilterToPipeline(f LineFilter) Pipeline {
if f == nil || f == TrueFilter {
return NoopPipeline
}
return PipelineFunc(func(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) {
return line, lbs, f.Filter(line)
})
}

func ParserToPipeline(p LabelParser) Pipeline {
return PipelineFunc(func(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) {
lbs = p.Parse(line, lbs)
return line, lbs, true
})
}

type matchersExpr struct {
matchers []*labels.Matcher
implicit
}

func newMatcherExpr(matchers []*labels.Matcher) LogSelectorExpr {
func newMatcherExpr(matchers []*labels.Matcher) *matchersExpr {
return &matchersExpr{matchers: matchers}
}

Expand All @@ -105,35 +181,54 @@ func (e *matchersExpr) String() string {
return sb.String()
}

func (e *matchersExpr) Filter() (LineFilter, error) {
return nil, nil
func (e *matchersExpr) Pipeline() (Pipeline, error) {
return NoopPipeline, nil
}

type pipelineExpr struct {
pipeline MultiPipelineExpr
left *matchersExpr
implicit
}

func (e *matchersExpr) Parser() (LabelParser, error) {
return NoopLabelParser, nil
func newPipelineExpr(left *matchersExpr, pipeline MultiPipelineExpr) LogSelectorExpr {
return &pipelineExpr{
left: left,
pipeline: pipeline,
}
}

type filterExpr struct {
left LogSelectorExpr
func (e *pipelineExpr) Matchers() []*labels.Matcher {
return e.left.Matchers()
}

func (e *pipelineExpr) String() string {
var sb strings.Builder
sb.WriteString(e.left.String())
sb.WriteString(e.pipeline.String())
return sb.String()
}

func (e *pipelineExpr) Pipeline() (Pipeline, error) {
return e.pipeline.Pipeline()
}

type lineFilterExpr struct {
left *lineFilterExpr
ty labels.MatchType
match string
implicit
}

// NewFilterExpr wraps an existing Expr with a next filter expression.
func NewFilterExpr(left LogSelectorExpr, ty labels.MatchType, match string) LogSelectorExpr {
return &filterExpr{
func newLineFilterExpr(left *lineFilterExpr, ty labels.MatchType, match string) *lineFilterExpr {
return &lineFilterExpr{
left: left,
ty: ty,
match: match,
}
}

func (e *filterExpr) Matchers() []*labels.Matcher {
return e.left.Matchers()
}

func (e *filterExpr) String() string {
func (e *lineFilterExpr) String() string {
var sb strings.Builder
sb.WriteString(e.left.String())
switch e.ty {
Expand All @@ -150,13 +245,13 @@ func (e *filterExpr) String() string {
return sb.String()
}

func (e *filterExpr) Filter() (LineFilter, error) {
func (e *lineFilterExpr) Filter() (LineFilter, error) {
f, err := newFilter(e.match, e.ty)
if err != nil {
return nil, err
}
if nextExpr, ok := e.left.(*filterExpr); ok {
nextFilter, err := nextExpr.Filter()
if e.left != nil {
nextFilter, err := e.left.Filter()
if err != nil {
return nil, err
}
Expand All @@ -172,35 +267,29 @@ func (e *filterExpr) Filter() (LineFilter, error) {
return f, nil
}

func (e *filterExpr) Parser() (LabelParser, error) {
return NoopLabelParser, nil
func (e *lineFilterExpr) Pipeline() (Pipeline, error) {
f, err := e.Filter()
if err != nil {
return nil, err
}
return FilterToPipeline(f), nil
}

type parserExpr struct {
left LogSelectorExpr
type labelParserExpr struct {
op string
param string
implicit
}

func newParserExpr(left LogSelectorExpr, op, param string) LogSelectorExpr {
func newLabelParserExpr(op, param string) *labelParserExpr {
// todo(cyriltovena): we might want to pre-validate param here to fail fast.
return &parserExpr{
left: left,
return &labelParserExpr{
op: op,
param: param,
}
}

func (e *parserExpr) Matchers() []*labels.Matcher {
return e.left.Matchers()
}

func (e *parserExpr) Filter() (LineFilter, error) {
return e.left.Filter()
}

func (e *parserExpr) Parser() (LabelParser, error) {
func (e *labelParserExpr) parser() (LabelParser, error) {
switch e.op {
case OpParserTypeJSON:
return NewJSONParser(), nil
Expand All @@ -213,9 +302,16 @@ func (e *parserExpr) Parser() (LabelParser, error) {
}
}

func (e *parserExpr) String() string {
func (e *labelParserExpr) Pipeline() (Pipeline, error) {
p, err := e.parser()
if err != nil {
return nil, err
}
return ParserToPipeline(p), nil
}

func (e *labelParserExpr) String() string {
var sb strings.Builder
sb.WriteString(e.left.String())
sb.WriteString("|")
sb.WriteString(e.op)
if e.param != "" {
Expand All @@ -224,6 +320,30 @@ func (e *parserExpr) String() string {
return sb.String()
}

type labelFilterExpr struct {
labelfilter.Filterer
implicit
}

func (e *labelFilterExpr) Pipeline() (Pipeline, error) {
return PipelineFunc(func(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) {
//todo (cyriltovena): handle error
ok, _ := e.Filterer.Filter(lbs)
return line, lbs, ok
}), nil
}

// func (e *parserExpr) String() string {
// var sb strings.Builder
// sb.WriteString(e.left.String())
// sb.WriteString("|")
// sb.WriteString(e.op)
// if e.param != "" {
// sb.WriteString(strconv.Quote(e.param))
// }
// return sb.String()
// }

func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
m, err := labels.NewMatcher(t, n, v)
if err != nil {
Expand All @@ -232,6 +352,14 @@ func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
return m
}

func mustNewFloat(s string) float64 {
n, err := strconv.ParseFloat(s, 64)
if err != nil {
panic(newParseError(fmt.Sprintf("unable to parse float: %s", err.Error()), 0, 0))
}
return n
}

type logRange struct {
left LogSelectorExpr
interval time.Duration
Expand All @@ -252,14 +380,14 @@ func newLogRange(left LogSelectorExpr, interval time.Duration) *logRange {
}
}

func addFilterToLogRangeExpr(left *logRange, ty labels.MatchType, match string) *logRange {
left.left = &filterExpr{
left: left.left,
ty: ty,
match: match,
}
return left
}
// func addFilterToLogRangeExpr(left *logRange, ty labels.MatchType, match string) *logRange {
// left.left = &filterExpr{
// left: left.left,
// ty: ty,
// match: match,
// }
// return left
// }

const (
// vector ops
Expand Down Expand Up @@ -565,9 +693,8 @@ func (e *literalExpr) String() string {
// and they will only be present in binary operation legs.
func (e *literalExpr) Selector() LogSelectorExpr { return e }
func (e *literalExpr) Operations() []string { return nil }
func (e *literalExpr) Filter() (LineFilter, error) { return nil, nil }
func (e *literalExpr) Pipeline() (Pipeline, error) { return NoopPipeline, nil }
func (e *literalExpr) Matchers() []*labels.Matcher { return nil }
func (e *literalExpr) Parser() (LabelParser, error) { return NoopLabelParser, nil }
func (e *literalExpr) Extractor() (SampleExtractor, error) { return nil, nil }

// helper used to impl Stringer for vector and range aggregations
Expand Down
Loading