Skip to content

Commit

Permalink
Feature/querysharding ii (grafana#1927)
Browse files Browse the repository at this point in the history
* [wip] sharding evaluator/ast

* [wip] continues experimenting with ast mapping

* refactoring in preparation for binops

* evaluators can pass state to other evaluators

* compiler alignment

* Evaluator method renamed to StepEvaluator

* chained evaluator impl

* tidying up sharding code

* handling for ConcatSampleExpr

* downstream iterator

* structure for downstreaming asts

* outlines sharding optimizations

* work on sharding mapper

* ast sharding optimizations

* test for different logrange positions

* shard mapper tests

* stronger ast sharding & tests

* shardmapper tests for string->string

* removes sharding evaluator code

* removes unused ctx arg

* Revert "removes sharding evaluator code"

This reverts commit 55d41b9.

* interfaces for downstreaming, type conversions

* sharding plumbing on frontend

* type alignment in queryrange to downstream sharded queriers

* downstreaming support for sharding incl storage code

* removes chainedevaluator

* comment alignment

* storage shard injection

* speccing out testware for sharding equivalence

* [wip] shared engine refactor

* sorting streams, sharding eval fixes

* downstream evaluator embeds defaultevaluator

* other pkgs adopt logql changes

* metrics & logs use same middleware instantiation process

* wires up shardingware

* middleware per metrics/logfilter

* empty step populating StepEvaluator promql.Matrix adapter

* sharding metrics

* log/span injection into sharded engine

* sharding metrics avoids multiple instantiation

* downstreamhandler tracing

* sharding parameterized libsonnet

* removes querier replicas

* default 32 concurrency for workers

* jsonnet correct level override

* unquote true in yaml

* lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator

* makes shardRecorder private

* logs query on failed parse

* refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface

* basic tests for querysharding mware

* [wip] concurrent evaluator

* integrates stat propagation into sharding evaluator

* splitby histogram

* extends le bounds for bytes processed

* byte throughput histogram buckets to 40gb

* chunk duration mixin

* fixes merge w/ field rename

* derives logger in sharded engine via ctx & logs some downstream evaluators

* moves sharded engine to top, adds comments

* logs failed merge results in stats ctx

* snapshotting stats merge logic is done more effectively

* per query concurrency controlled via downstreamer

* unexports decodereq

* queryrange testware

* downstreamer tests

* pr requests
  • Loading branch information
owen-d authored May 26, 2020
1 parent 156023a commit 89d80a6
Show file tree
Hide file tree
Showing 43 changed files with 2,808 additions and 377 deletions.
23 changes: 21 additions & 2 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,29 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string

eng := logql.NewEngine(conf.Querier.Engine, querier)
var query logql.Query

if q.isInstant() {
query = eng.NewInstantQuery(q.QueryString, q.Start, q.resultsDirection(), uint32(q.Limit))
query = eng.Query(logql.NewLiteralParams(
q.QueryString,
q.Start,
q.Start,
0,
0,
q.resultsDirection(),
uint32(q.Limit),
nil,
))
} else {
query = eng.NewRangeQuery(q.QueryString, q.Start, q.End, q.Step, q.Interval, q.resultsDirection(), uint32(q.Limit))
query = eng.Query(logql.NewLiteralParams(
q.QueryString,
q.Start,
q.End,
q.Step,
q.Interval,
q.resultsDirection(),
uint32(q.Limit),
nil,
))
}

// execute the query
Expand Down
4 changes: 4 additions & 0 deletions pkg/loghttp/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func direction(r *http.Request) (logproto.Direction, error) {
return parseDirection(r.Form.Get("direction"), logproto.BACKWARD)
}

func shards(r *http.Request) []string {
return r.Form["shards"]
}

func bounds(r *http.Request) (time.Time, time.Time, error) {
now := time.Now()
start, err := parseTimestamp(r.Form.Get("start"), now.Add(-defaultSince))
Expand Down
3 changes: 3 additions & 0 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ type RangeQuery struct {
Query string
Direction logproto.Direction
Limit uint32
Shards []string
}

// ParseRangeQuery parses a RangeQuery request from an http request.
Expand Down Expand Up @@ -280,6 +281,8 @@ func ParseRangeQuery(r *http.Request) (*RangeQuery, error) {
return nil, errNegativeStep
}

result.Shards = shards(r)

// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (result.End.Sub(result.Start) / result.Step) > 11000 {
Expand Down
7 changes: 7 additions & 0 deletions pkg/logproto/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logproto

import "github.com/prometheus/prometheus/pkg/labels"

// Note, this is not very efficient and use should be minimized as it requires label construction on each comparison
type SeriesIdentifiers []SeriesIdentifier

func (ids SeriesIdentifiers) Len() int { return len(ids) }
Expand All @@ -10,3 +11,9 @@ func (ids SeriesIdentifiers) Less(i, j int) bool {
a, b := labels.FromMap(ids[i].Labels), labels.FromMap(ids[j].Labels)
return labels.Compare(a, b) <= 0
}

type Streams []Stream

func (xs Streams) Len() int { return len(xs) }
func (xs Streams) Swap(i, j int) { xs[i], xs[j] = xs[j], xs[i] }
func (xs Streams) Less(i, j int) bool { return xs[i].Labels <= xs[j].Labels }
221 changes: 147 additions & 74 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ message QueryRequest {
google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
Direction direction = 5;
reserved 6;
repeated string shards = 7 [(gogoproto.jsontag) = "shards,omitempty"];

}

enum Direction {
Expand Down
26 changes: 0 additions & 26 deletions pkg/logql/astmapper.go

This file was deleted.

140 changes: 54 additions & 86 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logql

import (
"context"
"errors"
"sort"
"time"

Expand Down Expand Up @@ -35,6 +36,12 @@ const ValueTypeStreams = "streams"
// Streams is promql.Value
type Streams []logproto.Stream

func (streams Streams) Len() int { return len(streams) }
func (streams Streams) Swap(i, j int) { streams[i], streams[j] = streams[j], streams[i] }
func (streams Streams) Less(i, j int) bool {
return streams[i].Labels <= streams[j].Labels
}

// Type implements `promql.Value`
func (Streams) Type() promql.ValueType { return ValueTypeStreams }

Expand Down Expand Up @@ -67,31 +74,29 @@ func (opts *EngineOpts) applyDefault() {
}
}

// Engine interface used to construct queries
type Engine interface {
NewRangeQuery(qs string, start, end time.Time, step, interval time.Duration, direction logproto.Direction, limit uint32) Query
NewInstantQuery(qs string, ts time.Time, direction logproto.Direction, limit uint32) Query
}

// engine is the LogQL engine.
type engine struct {
// Engine is the LogQL engine.
type Engine struct {
timeout time.Duration
evaluator Evaluator
}

// NewEngine creates a new LogQL engine.
func NewEngine(opts EngineOpts, q Querier) Engine {
if q == nil {
panic("nil Querier")
}

// NewEngine creates a new LogQL Engine.
func NewEngine(opts EngineOpts, q Querier) *Engine {
opts.applyDefault()
return &Engine{
timeout: opts.Timeout,
evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod),
}
}

return &engine{
timeout: opts.Timeout,
evaluator: &defaultEvaluator{
querier: q,
maxLookBackPeriod: opts.MaxLookBackPeriod,
// Query creates a new LogQL query. Instant/Range type is derived from the parameters.
func (ng *Engine) Query(params Params) Query {
return &query{
timeout: ng.timeout,
params: params,
evaluator: ng.evaluator,
parse: func(_ context.Context, query string) (Expr, error) {
return ParseExpr(query)
},
}
}
Expand All @@ -103,17 +108,18 @@ type Query interface {
}

type query struct {
LiteralParams

ng *engine
timeout time.Duration
params Params
parse func(context.Context, string) (Expr, error)
evaluator Evaluator
}

// Exec Implements `Query`
// Exec Implements `Query`. It handles instrumentation & defers to Eval.
func (q *query) Exec(ctx context.Context) (Result, error) {
log, ctx := spanlogger.New(ctx, "Engine.Exec")
log, ctx := spanlogger.New(ctx, "query.Exec")
defer log.Finish()

rangeType := GetRangeType(q)
rangeType := GetRangeType(q.params)
timer := prometheus.NewTimer(queryTime.WithLabelValues(string(rangeType)))
defer timer.ObserveDuration()

Expand All @@ -122,7 +128,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
start := time.Now()
ctx = stats.NewContext(ctx)

data, err := q.ng.exec(ctx, q)
data, err := q.Eval(ctx)

statResult = stats.Snapshot(ctx, time.Since(start))
statResult.Log(level.Debug(log))
Expand All @@ -134,88 +140,49 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
status = "400"
}
}
RecordMetrics(ctx, q, status, statResult)
RecordMetrics(ctx, q.params, status, statResult)

return Result{
Data: data,
Statistics: statResult,
}, err
}

// NewRangeQuery creates a new LogQL range query.
func (ng *engine) NewRangeQuery(
qs string,
start, end time.Time, step time.Duration, interval time.Duration,
direction logproto.Direction, limit uint32) Query {
return &query{
LiteralParams: LiteralParams{
qs: qs,
start: start,
end: end,
step: step,
interval: interval,
direction: direction,
limit: limit,
},
ng: ng,
}
}

// NewInstantQuery creates a new LogQL instant query.
func (ng *engine) NewInstantQuery(
qs string,
ts time.Time,
direction logproto.Direction, limit uint32) Query {
return &query{
LiteralParams: LiteralParams{
qs: qs,
start: ts,
end: ts,
step: 0,
interval: 0,
direction: direction,
limit: limit,
},
ng: ng,
}
}

func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
func (q *query) Eval(ctx context.Context) (promql.Value, error) {
ctx, cancel := context.WithTimeout(ctx, q.timeout)
defer cancel()

qs := q.Query()

expr, err := ParseExpr(qs)
expr, err := q.parse(ctx, q.params.Query())
if err != nil {
return nil, err
}

switch e := expr.(type) {
case SampleExpr:
value, err := ng.evalSample(ctx, e, q)
value, err := q.evalSample(ctx, e)
return value, err

case LogSelectorExpr:
iter, err := ng.evaluator.Iterator(ctx, e, q)
iter, err := q.evaluator.Iterator(ctx, e, q.params)
if err != nil {
return nil, err
}

defer helpers.LogErrorWithContext(ctx, "closing iterator", iter.Close)
streams, err := readStreams(iter, q.limit, q.direction, q.interval)
streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval())
return streams, err
default:
return nil, errors.New("Unexpected type (%T): cannot evaluate")
}

return nil, nil
}

// evalSample evaluate a sampleExpr
func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (promql.Value, error) {
func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql.Value, error) {
if lit, ok := expr.(*literalExpr); ok {
return ng.evalLiteral(ctx, lit, q)
return q.evalLiteral(ctx, lit)
}

stepEvaluator, err := ng.evaluator.StepEvaluator(ctx, ng.evaluator, expr, q)
stepEvaluator, err := q.evaluator.StepEvaluator(ctx, q.evaluator, expr, q.params)
if err != nil {
return nil, err
}
Expand All @@ -224,7 +191,7 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
seriesIndex := map[uint64]*promql.Series{}

next, ts, vec := stepEvaluator.Next()
if GetRangeType(q) == InstantType {
if GetRangeType(q.params) == InstantType {
sort.Slice(vec, func(i, j int) bool { return labels.Compare(vec[i].Metric, vec[j].Metric) < 0 })
return vec, nil
}
Expand Down Expand Up @@ -262,21 +229,21 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
return result, nil
}

func (ng *engine) evalLiteral(_ context.Context, expr *literalExpr, q *query) (promql.Value, error) {
func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (promql.Value, error) {
s := promql.Scalar{
T: q.Start().UnixNano() / int64(time.Millisecond),
T: q.params.Start().UnixNano() / int64(time.Millisecond),
V: expr.value,
}

if GetRangeType(q) == InstantType {
if GetRangeType(q.params) == InstantType {
return s, nil
}

return PopulateMatrixFromScalar(s, q.LiteralParams), nil
return PopulateMatrixFromScalar(s, q.params), nil

}

func PopulateMatrixFromScalar(data promql.Scalar, params LiteralParams) promql.Matrix {
func PopulateMatrixFromScalar(data promql.Scalar, params Params) promql.Matrix {
var (
start = params.Start()
end = params.End()
Expand All @@ -286,7 +253,7 @@ func PopulateMatrixFromScalar(data promql.Scalar, params LiteralParams) promql.M
[]promql.Point,
0,
// allocate enough space for all needed entries
int(params.End().Sub(params.Start())/params.Step())+1,
int(end.Sub(start)/step)+1,
),
}
)
Expand Down Expand Up @@ -329,10 +296,11 @@ func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, inte
}
}

result := make([]logproto.Stream, 0, len(streams))
result := make(Streams, 0, len(streams))
for _, stream := range streams {
result = append(result, *stream)
}
sort.Sort(result)
return result, i.Error()
}

Expand Down
Loading

0 comments on commit 89d80a6

Please sign in to comment.