Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LogQL: Labels and Metrics Extraction #2769

Merged
merged 49 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
03279b1
Adds logfmt, regexp and json logql parser
cyriltovena Sep 15, 2020
31c2f3f
hook the ast with parsers.
cyriltovena Sep 16, 2020
eaf72bd
hook parser with memchunk.
cyriltovena Sep 17, 2020
7d1dc7b
hook parser with the storage.
cyriltovena Sep 17, 2020
484afc1
hook parser with ingesters
cyriltovena Sep 17, 2020
0121a3c
fixes all tests
cyriltovena Sep 17, 2020
62f2829
Refactor to pipeline and implement ast parsing.
cyriltovena Sep 21, 2020
89c489c
Fixes the lexer for duration and range
cyriltovena Sep 22, 2020
4238173
Fixes all tests and add some for label filters
cyriltovena Sep 23, 2020
88ad104
Add label and line format.
cyriltovena Sep 23, 2020
c7791a4
Add tests for fmt label and line with validations.
cyriltovena Sep 24, 2020
0651e25
Polishing parsers and add some more test cases
cyriltovena Sep 25, 2020
4c0570d
Finish the unwrap parser, still need to add more tests
cyriltovena Sep 29, 2020
01e93c0
Indent this hell.
cyriltovena Sep 29, 2020
e455c88
Moar tests and it works.
cyriltovena Sep 29, 2020
8bc18e5
Add more tests which lead me to find a bug in the lexer
cyriltovena Sep 30, 2020
08d2cf7
Add more tests and fix all engine tests
cyriltovena Sep 30, 2020
b801417
Fixes match stage in promtail pipelines.
cyriltovena Sep 30, 2020
850b003
Hook Pipeline into ingester, tailer and storage.
cyriltovena Oct 1, 2020
31c26c0
Correctly setup sharding for logqlv2
cyriltovena Oct 1, 2020
b5e11d0
Fixes precedences issue with label filters and add moar tests :v:
cyriltovena Oct 2, 2020
0fd6018
Adds quantile_over_time, grouping for non associate range aggregation…
cyriltovena Oct 2, 2020
2ca6677
Extract with grouping
cyriltovena Oct 3, 2020
4effb67
Adds parsing duration on unwrap
cyriltovena Oct 5, 2020
832a977
Improve the lexer to support more common identifier as functions.
cyriltovena Oct 6, 2020
6563d6e
Fixes the frontend logs to include org_id.
cyriltovena Oct 6, 2020
92f7c39
Merge branch 'fix-orgid-frontend' into logql-parser
cyriltovena Oct 6, 2020
5578dbb
Support byte sizes in label filters.
jeschkies Oct 9, 2020
13132ad
Wip on error handling.
cyriltovena Oct 12, 2020
db07446
Fixes json parser with prometheus label name rules.
cyriltovena Oct 12, 2020
78973cf
fixup! Support byte sizes in label filters.
jeschkies Oct 12, 2020
bbacba7
Merge remote-tracking branch 'cyril/logql-parser' into karsten/bytes-…
jeschkies Oct 12, 2020
25dd730
Merge pull request #5 from jeschkies/karsten/bytes-filter
cyriltovena Oct 12, 2020
c054a5d
Wip error handling, commit before big refactoring.
cyriltovena Oct 13, 2020
e7d8234
Merge branch 'logql-parser' of github.com:cyriltovena/loki into logql…
cyriltovena Oct 13, 2020
5ab8b5c
Refactoring in progress.
cyriltovena Oct 13, 2020
5272d91
Work in progress.
cyriltovena Oct 13, 2020
1aa1609
Got something that builds and throw __error__ labels properly now.
cyriltovena Oct 14, 2020
1af9c14
Add error handling + fixes groupins and post filtering.
cyriltovena Oct 14, 2020
960ef5a
400 on pipeline errors.
cyriltovena Oct 14, 2020
de83465
Fixes a races in the log pipeline.
cyriltovena Oct 15, 2020
87c4f00
Unsure the key is parsable and valid.
cyriltovena Oct 15, 2020
50315b1
Cleanup and code documentation.
cyriltovena Oct 15, 2020
54dd6c2
Lint.
cyriltovena Oct 15, 2020
a50490f
Lint.
cyriltovena Oct 15, 2020
cbbc37c
Merge remote-tracking branch 'upstream/master' into logql-parser
cyriltovena Oct 15, 2020
801b721
Fixes frontend handler.
cyriltovena Oct 15, 2020
1aee415
Fixes old test.
cyriltovena Oct 15, 2020
1ea917f
Fix go1.15 local failing test.
cyriltovena Oct 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactoring in progress.
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena committed Oct 13, 2020
commit 5ab8b5c80139925a3e1f40c50672bb09914fdeb2
69 changes: 22 additions & 47 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/labelfilter"
"github.com/grafana/loki/pkg/logql/log"
)

// Expr is the root expression which can be a SampleExpr or LogSelectorExpr
Expand Down Expand Up @@ -81,62 +82,36 @@ type LogSelectorExpr interface {
}

type PipelineExpr interface {
Pipeline() (Pipeline, error)
Pipeline() (log.Pipeline, error)
Expr
}

type Pipeline interface {
Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool)
}

var NoopPipeline = &noopPipeline{}

type noopPipeline struct{}

func (noopPipeline) Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) {
return line, lbs, true
}

type PipelineFunc func(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool)

func (fn PipelineFunc) Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) {
return fn(line, lbs)
}

type MultiPipeline []Pipeline

func (m MultiPipeline) Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) {
var ok bool
for _, p := range m {
line, lbs, ok = p.Process(line, lbs)
if !ok {
return line, lbs, ok
}
}
return line, lbs, ok
type StageExpr interface {
Stage() (log.Stage, error)
Expr
}

type MultiPipelineExpr []PipelineExpr
type MultiStageExpr []StageExpr

func (m MultiPipelineExpr) Pipeline() (Pipeline, error) {
c := make(MultiPipeline, 0, len(m))
func (m MultiStageExpr) Pipeline() (log.Pipeline, error) {
c := make(log.MultiStage, 0, len(m))
for _, e := range m {
p, err := e.Pipeline()
p, err := e.Stage()
if err != nil {
return nil, err
}
if p == NoopPipeline {
if p == log.NoopStage {
continue
}
c = append(c, p)
}
if len(c) == 0 {
return NoopPipeline, nil
return log.NoopPipeline, nil
}
return c, nil
}

func (m MultiPipelineExpr) String() string {
func (m MultiStageExpr) String() string {
var sb strings.Builder
for i, e := range m {
sb.WriteString(e.String())
Expand All @@ -147,7 +122,7 @@ func (m MultiPipelineExpr) String() string {
return sb.String()
}

func (MultiPipelineExpr) logQLExpr() {}
func (MultiStageExpr) logQLExpr() {}

func FilterToPipeline(f LineFilter) Pipeline {
if f == nil || f == TrueFilter {
Expand Down Expand Up @@ -191,21 +166,21 @@ func (e *matchersExpr) String() string {
return sb.String()
}

func (e *matchersExpr) Pipeline() (Pipeline, error) {
return NoopPipeline, nil
func (e *matchersExpr) Pipeline() (log.Pipeline, error) {
return log.NoopPipeline, nil
}

func (e *matchersExpr) HasFilter() bool {
return false
}

type pipelineExpr struct {
pipeline MultiPipelineExpr
pipeline MultiStageExpr
left *matchersExpr
implicit
}

func newPipelineExpr(left *matchersExpr, pipeline MultiPipelineExpr) LogSelectorExpr {
func newPipelineExpr(left *matchersExpr, pipeline MultiStageExpr) LogSelectorExpr {
return &pipelineExpr{
left: left,
pipeline: pipeline,
Expand All @@ -224,7 +199,7 @@ func (e *pipelineExpr) String() string {
return sb.String()
}

func (e *pipelineExpr) Pipeline() (Pipeline, error) {
func (e *pipelineExpr) Pipeline() (log.Pipeline, error) {
return e.pipeline.Pipeline()
}

Expand Down Expand Up @@ -291,8 +266,8 @@ func (e *lineFilterExpr) String() string {
return sb.String()
}

func (e *lineFilterExpr) Filter() (LineFilter, error) {
f, err := newFilter(e.match, e.ty)
func (e *lineFilterExpr) Filter() (log.Filterer, error) {
f, err := log.NewFilter(e.match, e.ty)
if err != nil {
return nil, err
}
Expand All @@ -302,11 +277,11 @@ func (e *lineFilterExpr) Filter() (LineFilter, error) {
return nil, err
}
if nextFilter != nil {
f = newAndFilter(nextFilter, f)
f = log.NewAndFilter(nextFilter, f)
}
}

if f == TrueFilter {
if f == log.TrueFilter {
return nil, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
BinOpModifier BinOpOptions
LabelParser *labelParserExpr
LineFilters *lineFilterExpr
PipelineExpr MultiPipelineExpr
PipelineStage PipelineExpr
PipelineExpr MultiStageExpr
PipelineStage StageExpr
BytesFilter labelfilter.Filterer
NumberFilter labelfilter.Filterer
DurationFilter labelfilter.Filterer
Expand Down Expand Up @@ -192,7 +192,7 @@ matcher:
;

pipelineExpr:
pipelineStage { $$ = MultiPipelineExpr{ $1 } }
pipelineStage { $$ = MultiStageExpr{ $1 } }
| pipelineExpr pipelineStage { $$ = append($1, $2)}
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
;

Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/labelfilter/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (

type Bytes struct {
Name string
Value uint64
Value uint64
Type FilterType
}

func NewBytes(t FilterType, name string, b uint64) *Bytes{
func NewBytes(t FilterType, name string, b uint64) *Bytes {
return &Bytes{
Name: name,
Type: t,
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/error.go → pkg/logql/log/error.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package logql
package log

var (
errJSON = "JSONParser"
Expand Down
56 changes: 28 additions & 28 deletions pkg/logql/filter.go → pkg/logql/log/filter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package logql
package log

import (
"bytes"
Expand All @@ -9,15 +9,15 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
)

// LineFilter is a interface to filter log lines.
type LineFilter interface {
// Filterer is a interface to filter log lines.
type Filterer interface {
Filter(line []byte) bool
}

// LineFilterFunc is a syntax sugar for creating line filter from a function
type LineFilterFunc func(line []byte) bool
type FiltererFunc func(line []byte) bool

func (f LineFilterFunc) Filter(line []byte) bool {
func (f FiltererFunc) Filter(line []byte) bool {
return f(line)
}

Expand All @@ -29,30 +29,30 @@ func (trueFilter) Filter(_ []byte) bool { return true }
var TrueFilter = trueFilter{}

type notFilter struct {
LineFilter
Filterer
}

func (n notFilter) Filter(line []byte) bool {
return !n.LineFilter.Filter(line)
return !n.Filterer.Filter(line)
}

// newNotFilter creates a new filter which matches only if the base filter doesn't match.
// If the base filter is a `or` it will recursively simplify with `and` operations.
func newNotFilter(base LineFilter) LineFilter {
func newNotFilter(base Filterer) Filterer {
// not(a|b) = not(a) and not(b) , and operation can't benefit from this optimization because both legs always needs to be executed.
if or, ok := base.(orFilter); ok {
return newAndFilter(newNotFilter(or.left), newNotFilter(or.right))
return NewAndFilter(newNotFilter(or.left), newNotFilter(or.right))
}
return notFilter{LineFilter: base}
return notFilter{Filterer: base}
}

type andFilter struct {
left LineFilter
right LineFilter
left Filterer
right Filterer
}

// newAndFilter creates a new filter which matches only if left and right matches.
func newAndFilter(left LineFilter, right LineFilter) LineFilter {
// NewAndFilter creates a new filter which matches only if left and right matches.
func NewAndFilter(left Filterer, right Filterer) Filterer {
// Make sure we take care of panics in case a nil or noop filter is passed.
if right == nil || right == TrueFilter {
return left
Expand All @@ -73,12 +73,12 @@ func (a andFilter) Filter(line []byte) bool {
}

type orFilter struct {
left LineFilter
right LineFilter
left Filterer
right Filterer
}

// newOrFilter creates a new filter which matches only if left or right matches.
func newOrFilter(left LineFilter, right LineFilter) LineFilter {
func newOrFilter(left Filterer, right Filterer) Filterer {
if left == nil || left == TrueFilter {
return right
}
Expand All @@ -94,7 +94,7 @@ func newOrFilter(left LineFilter, right LineFilter) LineFilter {
}

// chainOrFilter is a syntax sugar to chain multiple `or` filters. (1 or many)
func chainOrFilter(curr, new LineFilter) LineFilter {
func chainOrFilter(curr, new Filterer) Filterer {
if curr == nil {
return new
}
Expand All @@ -111,7 +111,7 @@ type regexpFilter struct {

// newRegexpFilter creates a new line filter for a given regexp.
// If match is false the filter is the negation of the regexp.
func newRegexpFilter(re string, match bool) (LineFilter, error) {
func newRegexpFilter(re string, match bool) (Filterer, error) {
reg, err := regexp.Compile(re)
if err != nil {
return nil, err
Expand Down Expand Up @@ -143,7 +143,7 @@ func (l containsFilter) String() string {
return string(l.match)
}

func newContainsFilter(match []byte, caseInsensitive bool) LineFilter {
func newContainsFilter(match []byte, caseInsensitive bool) Filterer {
if len(match) == 0 {
return TrueFilter
}
Expand All @@ -156,8 +156,8 @@ func newContainsFilter(match []byte, caseInsensitive bool) LineFilter {
}
}

// newFilter creates a new line filter from a match string and type.
func newFilter(match string, mt labels.MatchType) (LineFilter, error) {
// NewFilter creates a new line filter from a match string and type.
func NewFilter(match string, mt labels.MatchType) (Filterer, error) {
switch mt {
case labels.MatchRegexp:
return parseRegexpFilter(match, true)
Expand All @@ -174,7 +174,7 @@ func newFilter(match string, mt labels.MatchType) (LineFilter, error) {

// parseRegexpFilter parses a regexp and attempt to simplify it with only literal filters.
// If not possible it will returns the original regexp filter.
func parseRegexpFilter(re string, match bool) (LineFilter, error) {
func parseRegexpFilter(re string, match bool) (Filterer, error) {
reg, err := syntax.Parse(re, syntax.Perl)
if err != nil {
return nil, err
Expand All @@ -194,7 +194,7 @@ func parseRegexpFilter(re string, match bool) (LineFilter, error) {

// simplify a regexp expression by replacing it, when possible, with a succession of literal filters.
// For example `(foo|bar)` will be replaced by `containsFilter(foo) or containsFilter(bar)`
func simplify(reg *syntax.Regexp) (LineFilter, bool) {
func simplify(reg *syntax.Regexp) (Filterer, bool) {
switch reg.Op {
case syntax.OpAlternate:
return simplifyAlternate(reg)
Expand Down Expand Up @@ -230,7 +230,7 @@ func clearCapture(regs ...*syntax.Regexp) {

// simplifyAlternate simplifies, when possible, alternate regexp expressions such as:
// (foo|bar) or (foo|(bar|buzz)).
func simplifyAlternate(reg *syntax.Regexp) (LineFilter, bool) {
func simplifyAlternate(reg *syntax.Regexp) (Filterer, bool) {
clearCapture(reg.Sub...)
// attempt to simplify the first leg
f, ok := simplify(reg.Sub[0])
Expand All @@ -253,15 +253,15 @@ func simplifyAlternate(reg *syntax.Regexp) (LineFilter, bool) {
// which is a literalFilter.
// Or a literal and alternates operation (see simplifyConcatAlternate), which represent a multiplication of alternates.
// Anything else is rejected.
func simplifyConcat(reg *syntax.Regexp, baseLiteral []byte) (LineFilter, bool) {
func simplifyConcat(reg *syntax.Regexp, baseLiteral []byte) (Filterer, bool) {
clearCapture(reg.Sub...)
// we support only simplication of concat operation with 3 sub expressions.
// for instance .*foo.*bar contains 4 subs (.*+foo+.*+bar) and can't be simplified.
if len(reg.Sub) > 3 {
return nil, false
}

var curr LineFilter
var curr Filterer
var ok bool
literals := 0
for _, sub := range reg.Sub {
Expand Down Expand Up @@ -304,7 +304,7 @@ func simplifyConcat(reg *syntax.Regexp, baseLiteral []byte) (LineFilter, bool) {
// A concat alternate is found when a concat operation has a sub alternate and is preceded by a literal.
// For instance bar|b|buzz is expressed as b(ar|(?:)|uzz) => b concat alternate(ar,(?:),uzz).
// (?:) being an OpEmptyMatch and b being the literal to concat all alternates (ar,(?:),uzz) with.
func simplifyConcatAlternate(reg *syntax.Regexp, literal []byte, curr LineFilter) (LineFilter, bool) {
func simplifyConcatAlternate(reg *syntax.Regexp, literal []byte, curr Filterer) (Filterer, bool) {
for _, alt := range reg.Sub {
switch alt.Op {
case syntax.OpEmptyMatch:
Expand Down
Loading