Skip to content

Commit

Permalink
Improve logql parser allocations. (#2927)
Browse files Browse the repository at this point in the history
* Improve logql parser allocations.

Also moving labels parsing into logql package, and I'm not shy to say that promql is faster for this.
So we're using promql for parser, it's faster because their lexer is more sophisticated than ours.

Result are impressive.

```
❯ benchcmp before.txt after.txt
benchmark                    old ns/op     new ns/op     delta
Benchmark_ParseLabels-16     24394         8745          -64.15%

benchmark                    old allocs     new allocs     delta
Benchmark_ParseLabels-16     156            20             -87.18%

benchmark                    old bytes     new bytes     delta
Benchmark_ParseLabels-16     24976         2099          -91.60%
```

Combined with #2926, I think we should be able to get rid of `util.ToClientLabels`. But that's for another PR.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes a test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Nov 13, 2020
1 parent 2303f1b commit 049c7ac
Show file tree
Hide file tree
Showing 16 changed files with 133 additions and 105 deletions.
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestDistributor(t *testing.T) {
lines: 100,
mangleLabels: true,
expectedResponse: success,
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: parse error at line 1, col 4: literal not terminated"),
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: 1:4: parse error: unterminated quoted string"),
},
} {
t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.lines), func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error
defer t.pipelineMtx.Unlock()

streams := map[uint64]*logproto.Stream{}
lbs, err := util.ParseLabels(stream.Labels)
lbs, err := logql.ParseLabels(stream.Labels)
if err != nil {
return nil, err
}
Expand Down
15 changes: 7 additions & 8 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import (
"sort"
"time"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/promql/parser"

"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
promql_parser "github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
Expand Down Expand Up @@ -46,7 +45,7 @@ func (streams Streams) Less(i, j int) bool {
}

// Type implements `promql.Value`
func (Streams) Type() parser.ValueType { return ValueTypeStreams }
func (Streams) Type() promql_parser.ValueType { return ValueTypeStreams }

// String implements `promql.Value`
func (Streams) String() string {
Expand All @@ -55,7 +54,7 @@ func (Streams) String() string {

// Result is the result of a query execution.
type Result struct {
Data parser.Value
Data promql_parser.Value
Statistics stats.Result
}

Expand Down Expand Up @@ -161,7 +160,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
}, err
}

func (q *query) Eval(ctx context.Context) (parser.Value, error) {
func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
ctx, cancel := context.WithTimeout(ctx, q.timeout)
defer cancel()

Expand Down Expand Up @@ -190,7 +189,7 @@ func (q *query) Eval(ctx context.Context) (parser.Value, error) {
}

// evalSample evaluate a sampleExpr
func (q *query) evalSample(ctx context.Context, expr SampleExpr) (parser.Value, error) {
func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser.Value, error) {
if lit, ok := expr.(*literalExpr); ok {
return q.evalLiteral(ctx, lit)
}
Expand Down Expand Up @@ -254,7 +253,7 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (parser.Value,
return result, stepEvaluator.Error()
}

func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (parser.Value, error) {
func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (promql_parser.Value, error) {
s := promql.Scalar{
T: q.params.Start().UnixNano() / int64(time.Millisecond),
V: expr.value,
Expand Down
8 changes: 4 additions & 4 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
json "github.com/json-iterator/go"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -41,7 +41,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
data interface{}
params interface{}

expected parser.Value
expected promql_parser.Value
}{
{
`{app="foo"}`, time.Unix(30, 0), logproto.FORWARD, 10,
Expand Down Expand Up @@ -493,7 +493,7 @@ func TestEngine_RangeQuery(t *testing.T) {
data interface{}
params interface{}

expected parser.Value
expected promql_parser.Value
}{
{
`{app="foo"}`, time.Unix(0, 0), time.Unix(30, 0), time.Second, 0, logproto.FORWARD, 10,
Expand Down Expand Up @@ -1649,7 +1649,7 @@ func BenchmarkRangeQuery1000000(b *testing.B) {
benchmarkRangeQuery(int64(1000000), b)
}

var result parser.Value
var result promql_parser.Value

func benchmarkRangeQuery(testsize int64, b *testing.B) {
b.ReportAllocs()
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ import (

%%

root: expr { exprlex.(*lexer).expr = $1 };
root: expr { exprlex.(*parser).expr = $1 };

expr:
logExpr { $$ = $1 }
Expand Down
5 changes: 4 additions & 1 deletion pkg/logql/expr.y.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions pkg/logql/lex.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package logql

import (
"strconv"
"strings"
"text/scanner"
"time"
"unicode"

"github.com/dustin/go-humanize"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/util/strutil"
)

var tokens = map[string]int{
Expand Down Expand Up @@ -93,9 +93,7 @@ var functionTokens = map[string]int{

type lexer struct {
scanner.Scanner
errs []ParseError
expr Expr
parser *exprParserImpl
errs []ParseError
}

func (l *lexer) Lex(lval *exprSymType) int {
Expand Down Expand Up @@ -124,7 +122,7 @@ func (l *lexer) Lex(lval *exprSymType) int {

case scanner.String, scanner.RawString:
var err error
lval.str, err = strconv.Unquote(l.TokenText())
lval.str, err = strutil.Unquote(l.TokenText())
if err != nil {
l.Error(err.Error())
return 0
Expand All @@ -133,7 +131,7 @@ func (l *lexer) Lex(lval *exprSymType) int {
}

// scanning duration tokens
if l.TokenText() == "[" {
if r == '[' {
d := ""
for r := l.Next(); r != scanner.EOF; r = l.Next() {
if string(r) == "]" {
Expand All @@ -151,7 +149,9 @@ func (l *lexer) Lex(lval *exprSymType) int {
return 0
}

if tok, ok := functionTokens[l.TokenText()+string(l.Peek())]; ok {
tokenText := l.TokenText()
tokenNext := tokenText + string(l.Peek())
if tok, ok := functionTokens[tokenNext]; ok {
// create a copy to advance to the entire token for testing suffix
sc := l.Scanner
sc.Next()
Expand All @@ -161,20 +161,20 @@ func (l *lexer) Lex(lval *exprSymType) int {
}
}

if tok, ok := functionTokens[l.TokenText()]; ok && isFunction(l.Scanner) {
if tok, ok := functionTokens[tokenText]; ok && isFunction(l.Scanner) {
return tok
}

if tok, ok := tokens[l.TokenText()+string(l.Peek())]; ok {
if tok, ok := tokens[tokenNext]; ok {
l.Next()
return tok
}

if tok, ok := tokens[l.TokenText()]; ok {
if tok, ok := tokens[tokenText]; ok {
return tok
}

lval.str = l.TokenText()
lval.str = tokenText
return IDENTIFIER
}

Expand Down
64 changes: 50 additions & 14 deletions pkg/logql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,26 @@ package logql

import (
"errors"
"sort"
"strings"
"sync"
"text/scanner"

"github.com/prometheus/prometheus/pkg/labels"
promql_parser "github.com/prometheus/prometheus/promql/parser"
)

var parserPool = sync.Pool{
New: func() interface{} {
p := &parser{
p: &exprParserImpl{},
Reader: strings.NewReader(""),
lexer: &lexer{},
}
return p
},
}

func init() {
// Improve the error messages coming out of yacc.
exprErrorVerbose = true
Expand All @@ -18,11 +32,29 @@ func init() {
}
}

type parser struct {
p *exprParserImpl
*lexer
expr Expr
*strings.Reader
}

func (p *parser) Parse() (Expr, error) {
p.lexer.errs = p.lexer.errs[:0]
p.lexer.Scanner.Error = func(_ *scanner.Scanner, msg string) {
p.lexer.Error(msg)
}
e := p.p.Parse(p)
if e != 0 || len(p.lexer.errs) > 0 {
return nil, p.lexer.errs[0]
}
return p.expr, nil
}

// ParseExpr parses a string and returns an Expr.
func ParseExpr(input string) (expr Expr, err error) {
defer func() {
r := recover()
if r != nil {
if r := recover(); r != nil {
var ok bool
if err, ok = r.(error); ok {
if IsParseError(err) {
Expand All @@ -32,18 +64,12 @@ func ParseExpr(input string) (expr Expr, err error) {
}
}
}()
l := lexer{
parser: exprNewParser().(*exprParserImpl),
}
l.Init(strings.NewReader(input))
l.Scanner.Error = func(_ *scanner.Scanner, msg string) {
l.Error(msg)
}
e := l.parser.Parse(&l)
if e != 0 || len(l.errs) > 0 {
return nil, l.errs[0]
}
return l.expr, nil
p := parserPool.Get().(*parser)
defer parserPool.Put(p)

p.Reader.Reset(input)
p.lexer.Init(p.Reader)
return p.Parse()
}

// ParseMatchers parses a string and returns labels matchers, if the expression contains
Expand Down Expand Up @@ -85,3 +111,13 @@ func ParseLogSelector(input string) (LogSelectorExpr, error) {
}
return logSelector, nil
}

// ParseLabels parses labels from a string using logql parser.
func ParseLabels(lbs string) (labels.Labels, error) {
ls, err := promql_parser.ParseMetric(lbs)
if err != nil {
return nil, err
}
sort.Sort(ls)
return ls, nil
}
30 changes: 30 additions & 0 deletions pkg/logql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2000,3 +2000,33 @@ func Test_PipelineCombined(t *testing.T) {
)
require.Equal(t, string([]byte(`1.5s|POST|200`)), string(line))
}

var c []*labels.Matcher

func Benchmark_ParseMatchers(b *testing.B) {
s := `{cpu="10",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}`
var err error
for n := 0; n < b.N; n++ {
c, err = ParseMatchers(s)
require.NoError(b, err)
}
}

var lbs labels.Labels

func Benchmark_CompareParseLabels(b *testing.B) {
s := `{cpu="10",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}`
var err error
b.Run("logql", func(b *testing.B) {
for n := 0; n < b.N; n++ {
c, err = ParseMatchers(s)
require.NoError(b, err)
}
})
b.Run("promql", func(b *testing.B) {
for n := 0; n < b.N; n++ {
lbs, err = ParseLabels(s)
require.NoError(b, err)
}
})
}
4 changes: 2 additions & 2 deletions pkg/logql/range_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
promql_parser "github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/iter"
)
Expand Down Expand Up @@ -117,7 +117,7 @@ func (r *rangeVectorIterator) load(start, end int64) {
var metric labels.Labels
if metric, ok = r.metrics[lbs]; !ok {
var err error
metric, err = parser.ParseMetric(lbs)
metric, err = promql_parser.ParseMetric(lbs)
if err != nil {
continue
}
Expand Down
Loading

0 comments on commit 049c7ac

Please sign in to comment.