Skip to content

Commit

Permalink
add on and ignoring clauses in binOpExpr (#4391)
Browse files Browse the repository at this point in the history
* add on and ignoring clauses in binOpExpr

* add on/ignoring new operator shardmapper unit test

* add doc for on and ignoring keywords

* resolve comments

* use a scoped error to replace mutate stepEvaluator via type switching
* change VectorMatching Include from empty string array to nil
* prohibit sharding when we're changing the label groupings such as on or ignoring
* add unit test to cover multiple matches for labels error
* fix expr.y format issue

* resolve comments

* reformat on and ignoring keywords doc
* add sharding unit test for on and ignoring keywords

* resolve comments: add a sentence for each example to help docs readers to understand the larger picture

Co-authored-by: garrettlish <garrettlish@163.com>
  • Loading branch information
garrettlish and garrettlish authored Oct 5, 2021
1 parent d5ec714 commit 4417340
Show file tree
Hide file tree
Showing 9 changed files with 778 additions and 248 deletions.
20 changes: 20 additions & 0 deletions docs/sources/logql/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,26 @@ More details can be found in the [Golang language documentation](https://golang.

`2 * 3 % 2` is evaluated as `(2 * 3) % 2`.

### Keywords on and ignoring
The `ignoring` keyword causes specified labels to be ignored during matching.
The syntax:
```logql
<vector expr> <bin-op> ignoring(<labels>) <vector expr>
```
This example will return the machines which total count within the last minutes exceed average value for app `foo`.
```logql
max by(machine) (count_over_time({app="foo"}[1m])) > bool ignoring(machine) avg(count_over_time({app="foo"}[1m]))
```
The on keyword reduces the set of considered labels to a specified list.
The syntax:
```logql
<vector expr> <bin-op> on(<labels>) <vector expr>
```
This example will return every machine total count within the last minutes ratio in app `foo`:
```logql
sum by(machine) (count_over_time({app="foo"}[1m])) / on() sum(count_over_time({app="foo"}[1m]))
```

## Comments

LogQL queries can be commented using the `#` character:
Expand Down
35 changes: 29 additions & 6 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,9 @@ const (
OpUnwrap = "unwrap"
OpOffset = "offset"

OpOn = "on"
OpIgnoring = "ignoring"

// conversion Op
OpConvBytes = "bytes"
OpConvDuration = "duration"
Expand Down Expand Up @@ -959,34 +962,54 @@ func (e *VectorAggregationExpr) Walk(f WalkFn) {
e.left.Walk(f)
}

type VectorMatching struct {
On bool
Include []string
}

type BinOpOptions struct {
ReturnBool bool
ReturnBool bool
VectorMatching *VectorMatching
}

type BinOpExpr struct {
SampleExpr
RHS SampleExpr
op string
opts BinOpOptions
opts *BinOpOptions
}

func (e *BinOpExpr) String() string {
if e.opts.ReturnBool {
return fmt.Sprintf("(%s %s bool %s)", e.SampleExpr.String(), e.op, e.RHS.String())
op := e.op
if e.opts != nil {
if e.opts.ReturnBool {
op = fmt.Sprintf("%s bool", op)
}
if e.opts.VectorMatching != nil {
if e.opts.VectorMatching.On {
op = fmt.Sprintf("%s %s (%s)", op, OpOn, strings.Join(e.opts.VectorMatching.Include, ","))
} else {
op = fmt.Sprintf("%s %s (%s)", op, OpIgnoring, strings.Join(e.opts.VectorMatching.Include, ","))
}
}
}
return fmt.Sprintf("(%s %s %s)", e.SampleExpr.String(), e.op, e.RHS.String())
return fmt.Sprintf("(%s %s %s)", e.SampleExpr.String(), op, e.RHS.String())
}

// impl SampleExpr
func (e *BinOpExpr) Shardable() bool {
if e.opts != nil && e.opts.VectorMatching != nil {
// prohibit sharding when we're changing the label groupings, such as on or ignoring
return false
}
return shardableOps[e.op] && e.SampleExpr.Shardable() && e.RHS.Shardable()
}

func (e *BinOpExpr) Walk(f WalkFn) {
walkAll(f, e.SampleExpr, e.RHS)
}

func mustNewBinOpExpr(op string, opts BinOpOptions, lhs, rhs Expr) SampleExpr {
func mustNewBinOpExpr(op string, opts *BinOpOptions, lhs, rhs Expr) SampleExpr {
left, ok := lhs.(SampleExpr)
if !ok {
panic(logqlmodel.NewParseError(fmt.Sprintf(
Expand Down
93 changes: 89 additions & 4 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
data interface{}
params interface{}

expected promql_parser.Value
expected interface{}
}{
{
`{app="foo"}`, time.Unix(30, 0), logproto.FORWARD, 10,
Expand Down Expand Up @@ -603,6 +603,87 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
},
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 50}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`sum by (app) (count_over_time({app="foo"}[1m])) + sum by (app) (count_over_time({app="bar"}[1m]))`,
time.Unix(60, 0),
logproto.FORWARD,
0,
[][]logproto.Series{
{newSeries(testSize, identity, `{app="foo"}`)},
{newSeries(testSize, identity, `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app) (count_over_time({app="foo"}[1m]))`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app) (count_over_time({app="bar"}[1m]))`}},
},
promql.Vector{},
},
{
`sum by (app) (count_over_time({app="foo"}[1m])) + sum by (app) (count_over_time({app="foo"}[1m]))`,
time.Unix(60, 0),
logproto.FORWARD,
0,
[][]logproto.Series{
{newSeries(testSize, identity, `{app="foo"}`)},
{newSeries(testSize, identity, `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app) (count_over_time({app="foo"}[1m]))`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app) (count_over_time({app="foo"}[1m]))`}},
},
promql.Vector{
promql.Sample{Point: promql.Point{T: 60 * 1000, V: 120}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}},
},
},
{
`sum by (app,machine) (count_over_time({app="foo"}[1m])) + on () sum by (app) (count_over_time({app="foo"}[1m]))`,
time.Unix(60, 0),
logproto.FORWARD,
0,
[][]logproto.Series{
{newSeries(testSize, identity, `{app="foo",machine="fuzz"}`)},
{newSeries(testSize, identity, `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app,machine) (count_over_time({app="foo"}[1m]))`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app) (count_over_time({app="foo"}[1m]))`}},
},
promql.Vector{
promql.Sample{Point: promql.Point{T: 60 * 1000, V: 120}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}, labels.Label{Name: "machine", Value: "fuzz"}}},
},
},
{
`sum by (app,machine) (count_over_time({app="foo"}[1m])) > bool ignoring (machine) sum by (app) (count_over_time({app="foo"}[1m]))`,
time.Unix(60, 0),
logproto.FORWARD,
0,
[][]logproto.Series{
{newSeries(testSize, identity, `{app="foo",machine="fuzz"}`)},
{newSeries(testSize, identity, `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app,machine) (count_over_time({app="foo"}[1m]))`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app) (count_over_time({app="foo"}[1m]))`}},
},
promql.Vector{
promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}, labels.Label{Name: "machine", Value: "fuzz"}}},
},
},
{
`sum by (app,machine) (count_over_time({app="foo"}[1m])) > bool ignoring (machine) sum by (app) (count_over_time({app="foo"}[1m]))`,
time.Unix(60, 0),
logproto.FORWARD,
0,
[][]logproto.Series{
{newSeries(testSize, identity, `{app="foo",machine="fuzz"}`), newSeries(testSize, identity, `{app="foo",machine="buzz"}`)},
{newSeries(testSize, identity, `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app,machine) (count_over_time({app="foo"}[1m]))`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app) (count_over_time({app="foo"}[1m]))`}},
},
errors.New("multiple matches for labels"),
},
} {
test := test
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
Expand All @@ -617,10 +698,14 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
limit: test.limit,
})
res, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if err != nil {
t.Fatal(err)
if expectedError, ok := test.expected.(error); ok {
assert.Equal(t, expectedError.Error(), err.Error())
} else {
if err != nil {
t.Fatal(err)
}
assert.Equal(t, test.expected, res.Data)
}
assert.Equal(t, test.expected, res.Data)
})
}
}
Expand Down
23 changes: 21 additions & 2 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,20 @@ func binOpStepEvaluator(
// TODO(owen-d): this seems wildly inefficient: we're calculating
// the hash on each sample & step per evaluator.
// We seem limited to this approach due to using the StepEvaluator ifc.
hash := sample.Metric.Hash()

var hash uint64
if expr.opts == nil || expr.opts.VectorMatching == nil {
hash = sample.Metric.Hash()
} else if expr.opts.VectorMatching.On {
hash = sample.Metric.WithLabels(expr.opts.VectorMatching.Include...).Hash()
} else {
hash = sample.Metric.WithoutLabels(expr.opts.VectorMatching.Include...).Hash()
}
pair := pairs[hash]
if pair[i] != nil {
err = errors.New("multiple matches for labels")
return false, ts, nil
}
pair[i] = &promql.Sample{
Metric: sample.Metric,
Point: sample.Point,
Expand All @@ -588,7 +600,11 @@ func binOpStepEvaluator(
results := make(promql.Vector, 0, len(pairs))
for _, pair := range pairs {
// merge
if merged := mergeBinOp(expr.op, pair[0], pair[1], !expr.opts.ReturnBool, IsComparisonOperator(expr.op)); merged != nil {
filter := true
if expr.opts != nil && expr.opts.ReturnBool {
filter = false
}
if merged := mergeBinOp(expr.op, pair[0], pair[1], filter, IsComparisonOperator(expr.op)); merged != nil {
results = append(results, *merged)
}
}
Expand All @@ -603,6 +619,9 @@ func binOpStepEvaluator(
return lastError
}, func() error {
var errs []error
if err != nil {
errs = append(errs, err)
}
for _, ev := range []StepEvaluator{lhs, rhs} {
if err := ev.Error(); err != nil {
errs = append(errs, err)
Expand Down
37 changes: 31 additions & 6 deletions pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
str string
duration time.Duration
LiteralExpr *LiteralExpr
BinOpModifier BinOpOptions
BinOpModifier *BinOpOptions
BoolModifier *BinOpOptions
LabelParser *LabelParserExpr
LineFilters *LineFilterExpr
LineFilter *LineFilterExpr
Expand Down Expand Up @@ -78,6 +79,7 @@ import (
%type <LiteralExpr> literalExpr
%type <LabelReplaceExpr> labelReplaceExpr
%type <BinOpModifier> binOpModifier
%type <BoolModifier> boolModifier
%type <LabelParser> labelParser
%type <PipelineExpr> pipelineExpr
%type <PipelineStage> pipelineStage
Expand Down Expand Up @@ -106,7 +108,7 @@ import (
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE BOOL JSON REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
MAX_OVER_TIME STDVAR_OVER_TIME STDDEV_OVER_TIME QUANTILE_OVER_TIME BYTES_CONV DURATION_CONV DURATION_SECONDS_CONV
FIRST_OVER_TIME LAST_OVER_TIME ABSENT_OVER_TIME LABEL_REPLACE UNPACK OFFSET PATTERN IP
FIRST_OVER_TIME LAST_OVER_TIME ABSENT_OVER_TIME LABEL_REPLACE UNPACK OFFSET PATTERN IP ON IGNORING

// Operators are listed with increasing precedence.
%left <binOp> OR
Expand Down Expand Up @@ -342,7 +344,6 @@ numberFilter:
| IDENTIFIER CMP_EQ NUMBER { $$ = log.NewNumericLabelFilter(log.LabelFilterEqual, $1, mustNewFloat($3))}
;

// TODO(owen-d): add (on,ignoring) clauses to binOpExpr
// Operator precedence only works if each of these is listed separately.
binOpExpr:
expr OR binOpModifier expr { $$ = mustNewBinOpExpr("or", $3, $1, $4) }
Expand All @@ -363,9 +364,33 @@ binOpExpr:
;

binOpModifier:
{ $$ = BinOpOptions{} }
| BOOL { $$ = BinOpOptions{ ReturnBool: true } }
;
boolModifier { $$ = $1 }
| boolModifier ON OPEN_PARENTHESIS labels CLOSE_PARENTHESIS
{
$$ = $1
$$.VectorMatching = &VectorMatching{On: true, Include: $4}
}
| boolModifier ON OPEN_PARENTHESIS CLOSE_PARENTHESIS
{
$$ = $1
$$.VectorMatching = &VectorMatching{On: true, Include: nil}
}
| boolModifier IGNORING OPEN_PARENTHESIS labels CLOSE_PARENTHESIS
{
$$ = $1
$$.VectorMatching = &VectorMatching{On: false, Include: $4}
}
| boolModifier IGNORING OPEN_PARENTHESIS CLOSE_PARENTHESIS
{
$$ = $1
$$.VectorMatching = &VectorMatching{On: false, Include: nil}
}
;

boolModifier:
{ $$ = &BinOpOptions{} }
| BOOL { $$ = &BinOpOptions{ ReturnBool: true } }
;

literalExpr:
NUMBER { $$ = mustNewLiteralExpr( $1, false ) }
Expand Down
Loading

0 comments on commit 4417340

Please sign in to comment.