Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tsdb/index sampling endpoint #6347

Merged
merged 5 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions docs/sources/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ These endpoints are exposed by the querier and the query frontend:
- [`GET /loki/api/v1/query_range`](#get-lokiapiv1query_range)
- [`GET /loki/api/v1/labels`](#get-lokiapiv1labels)
- [`GET /loki/api/v1/label/<name>/values`](#get-lokiapiv1labelnamevalues)
- [`GET /loki/api/v1/series`](#series)
- [`GET /loki/api/v1/index/stats`](#index-stats)
- [`GET /loki/api/v1/tail`](#get-lokiapiv1tail)
- [`POST /loki/api/v1/push`](#post-lokiapiv1push)
- [`GET /ready`](#get-ready)
Expand Down Expand Up @@ -920,6 +922,38 @@ $ curl -s "http://localhost:3100/loki/api/v1/series" --data-urlencode 'match[]={
}
```


## Index Stats

The `/loki/api/v1/index/stats` endpoint can be used to query the index for the number of `streams`, `chunks`, `entries`, and `bytes` that a query resolves to.

URL query parameters:

- `query`: The [LogQL](../logql/) matchers to check (i.e. `{job="foo", env!="dev"}`)
- `start=<nanosecond Unix epoch>`: Start timestamp.
- `end=<nanosecond Unix epoch>`: End timestamp.

You can URL-encode these parameters directly in the request body by using the POST method and `Content-Type: application/x-www-form-urlencoded` header. This is useful when specifying a large or dynamic number of stream selectors that may breach server-side URL character limits.

Response:
```json
{
"streams": 100,
"chunks": 1000,
"entries": 5000,
"bytes": 100000,
}
```

It is an approximation with the following caveats:
* It does not include data from the ingesters
* It is a probabilistic technique
* streams/chunks which span multiple period configurations may be counted twice.

These make it generally more helpful for larger queries.
It can be used for better understanding the throughput requirements and data topology for a list of matchers over a period of time.


## Statistics

Query endpoints such as `/api/prom/query`, `/loki/api/v1/query` and `/loki/api/v1/query_range` return a set of statistics about the query execution. Those statistics allow users to understand the amount of data processed and at which speed.
Expand Down
2 changes: 2 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
"/loki/api/v1/labels": http.HandlerFunc(t.querierAPI.LabelHandler),
"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.querierAPI.LabelHandler),
"/loki/api/v1/series": http.HandlerFunc(t.querierAPI.SeriesHandler),
"/loki/api/v1/index/stats": http.HandlerFunc(t.querierAPI.IndexStatsHandler),

"/api/prom/query": httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.LogQueryHandler)),
"/api/prom/label": http.HandlerFunc(t.querierAPI.LabelHandler),
Expand Down Expand Up @@ -675,6 +676,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
t.Server.HTTP.Path("/loki/api/v1/labels").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/series").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/index/stats").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/api/prom/query").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/api/prom/label").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/api/prom/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler)
Expand Down
33 changes: 33 additions & 0 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
index_stats "github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/marshal"
Expand Down Expand Up @@ -412,6 +413,38 @@ func (q *QuerierAPI) SeriesHandler(w http.ResponseWriter, r *http.Request) {
}
}

// SeriesHandler returns the list of time series that match a certain label set.
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
func (q *QuerierAPI) IndexStatsHandler(w http.ResponseWriter, r *http.Request) {
// TODO(owen-d): usea specific type/validation instead
// of using range query parameters (superset)
req, err := loghttp.ParseRangeQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

_, ctx := spanlogger.New(r.Context(), "query.IndexStats")

// TODO(owen-d): log metadata, record stats?
resp, err := q.querier.IndexStats(ctx, req)
if resp == nil {
// Some stores don't implement this
resp = &index_stats.Stats{}
}

if err != nil {
serverutil.WriteError(err, w)
return
}

err = marshal.WriteIndexStatsResponseJSON(resp, w)
if err != nil {
serverutil.WriteError(err, w)
return
}
}

// parseRegexQuery parses regex and query querystring from httpRequest and returns the combined LogQL query.
// This is used only to keep regexp query string support until it gets fully deprecated.
func parseRegexQuery(httpRequest *http.Request) (string, error) {
Expand Down
28 changes: 28 additions & 0 deletions pkg/querier/multi_tenant_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
)

const (
Expand Down Expand Up @@ -158,6 +160,32 @@ func (q *MultiTenantQuerier) Series(ctx context.Context, req *logproto.SeriesReq
return logproto.MergeSeriesResponses(responses)
}

func (q *MultiTenantQuerier) IndexStats(ctx context.Context, req *loghttp.RangeQuery) (*stats.Stats, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}

if len(tenantIDs) == 1 {
return q.Querier.IndexStats(ctx, req)
}

responses := make([]*stats.Stats, len(tenantIDs))
for i, id := range tenantIDs {
singleContext := user.InjectOrgID(ctx, id)
resp, err := q.Querier.IndexStats(singleContext, req)
if err != nil {
return nil, err
}

responses[i] = resp
}

merged := stats.MergeStats(responses...)

return &merged, nil
}

// removeTenantSelector filters the given tenant IDs based on any tenant ID filter the in passed selector.
func removeTenantSelector(params logql.SelectSampleParams, tenantIDs []string) (map[string]struct{}, syntax.Expr, error) {
expr, err := params.Expr()
Expand Down
36 changes: 34 additions & 2 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
Expand All @@ -22,7 +20,10 @@ import (
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
listutil "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/spanlogger"
util_validation "github.com/grafana/loki/pkg/util/validation"
Expand Down Expand Up @@ -83,6 +84,7 @@ type Querier interface {
Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error)
Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error)
Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error)
IndexStats(ctx context.Context, req *loghttp.RangeQuery) (*stats.Stats, error)
}

// SingleTenantQuerier handles single tenant queries.
Expand Down Expand Up @@ -677,3 +679,33 @@ func (q *SingleTenantQuerier) checkTailRequestLimit(ctx context.Context) error {

return nil
}

func (q *SingleTenantQuerier) IndexStats(ctx context.Context, req *loghttp.RangeQuery) (*stats.Stats, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

start, end, err := validateQueryTimeRangeLimits(ctx, userID, q.limits, req.Start, req.End)
if err != nil {
return nil, err
}

matchers, err := syntax.ParseMatchers(req.Query)
if err != nil {
return nil, err
}

// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
defer cancel()

return q.store.Stats(
ctx,
userID,
model.TimeFromUnixNano(start.UnixNano()),
model.TimeFromUnixNano(end.UnixNano()),
matchers...,
)

}
5 changes: 5 additions & 0 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/storage/chunk"
Expand Down Expand Up @@ -486,3 +487,7 @@ func (q *querierMock) Series(ctx context.Context, req *logproto.SeriesRequest) (
func (q *querierMock) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) {
return nil, errors.New("querierMock.Tail() has not been mocked")
}

func (q *querierMock) IndexStats(ctx context.Context, req *loghttp.RangeQuery) (*stats.Stats, error) {
return nil, nil
}
Loading