Skip to content

Refactor query api #6779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/api/queryapi"
"github.com/cortexproject/cortex/pkg/api/query"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/codec"
"github.com/cortexproject/cortex/pkg/querier/stats"
Expand Down Expand Up @@ -280,7 +280,7 @@ func NewQuerierHandler(
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
api.Register(legacyPromRouter)

queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
queryAPI := query.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
Expand Down
93 changes: 67 additions & 26 deletions pkg/api/queryapi/query_api.go → pkg/api/query/handler.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
package queryapi
package query

import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/regexp"
jsoniter "github.com/json-iterator/go"
"github.com/munnerz/goautoneg"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/httputil"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/api"
)

Expand Down Expand Up @@ -53,56 +53,57 @@ func NewQueryAPI(
}

func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
// TODO(Sungjin1212): Change to emit basic error (not gRPC)
start, err := util.ParseTime(r.FormValue("start"))
start, err := api.ParseTime(r.FormValue("start"))
if err != nil {
return invalidParamError(err, "start")
}
end, err := util.ParseTime(r.FormValue("end"))
end, err := api.ParseTime(r.FormValue("end"))
if err != nil {
return invalidParamError(err, "end")
}
if end < start {
return invalidParamError(ErrEndBeforeStart, "end")

if end.Before(start) {
return invalidParamError(errors.New("end timestamp must not be before start time"), "end")
}

step, err := util.ParseDurationMs(r.FormValue("step"))
step, err := api.ParseDuration(r.FormValue("step"))
if err != nil {
return invalidParamError(err, "step")
}

if step <= 0 {
return invalidParamError(ErrNegativeStep, "step")
return invalidParamError(errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer"), "step")
}

// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (end-start)/step > 11000 {
return apiFuncResult{nil, &apiError{errorBadData, ErrStepTooSmall}, nil, nil}
if end.Sub(start)/step > 11000 {
err := errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}

ctx := r.Context()
if to := r.FormValue("timeout"); to != "" {
var cancel context.CancelFunc
timeout, err := util.ParseDurationMs(to)
timeout, err := api.ParseDuration(to)
if err != nil {
return invalidParamError(err, "timeout")
}

ctx, cancel = context.WithTimeout(ctx, convertMsToDuration(timeout))
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

opts, err := extractQueryOpts(r)
opts, err := ExtractQueryOpts(r)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}

ctx = engine.AddEngineTypeToContext(ctx, r)
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), start, end, step)
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
return invalidParamError(err, "query")
}
// From now on, we must only return with a finalizer in the result (to
// be called by the caller) or call qry.Close ourselves (which is
Expand Down Expand Up @@ -131,34 +132,33 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
}

func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
// TODO(Sungjin1212): Change to emit basic error (not gRPC)
ts, err := util.ParseTimeParam(r, "time", q.now().Unix())
ts, err := api.ParseTimeParam(r, "time", q.now())
if err != nil {
return invalidParamError(err, "time")
}

ctx := r.Context()
if to := r.FormValue("timeout"); to != "" {
var cancel context.CancelFunc
timeout, err := util.ParseDurationMs(to)
timeout, err := api.ParseDuration(to)
if err != nil {
return invalidParamError(err, "timeout")
}

ctx, cancel = context.WithDeadline(ctx, q.now().Add(convertMsToDuration(timeout)))
ctx, cancel = context.WithDeadline(ctx, q.now().Add(timeout))
defer cancel()
}

opts, err := extractQueryOpts(r)
opts, err := ExtractQueryOpts(r)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}

ctx = engine.AddEngineTypeToContext(ctx, r)
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), ts)
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
return invalidParamError(err, "query")
}

// From now on, we must only return with a finalizer in the result (to
Expand Down Expand Up @@ -197,7 +197,7 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
}

if result.err != nil {
api.RespondFromGRPCError(q.logger, w, result.err.err)
q.respondError(w, result.err, result.data)
return
}

Expand All @@ -209,6 +209,47 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
}
}

func (q *QueryAPI) respondError(w http.ResponseWriter, apiErr *apiError, data interface{}) {
json := jsoniter.ConfigCompatibleWithStandardLibrary
b, err := json.Marshal(&Response{
Status: statusError,
ErrorType: apiErr.typ,
Error: apiErr.err.Error(),
Data: data,
})
if err != nil {
level.Error(q.logger).Log("error marshaling json response", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

var code int
switch apiErr.typ {
case errorBadData:
code = http.StatusBadRequest
case errorExec:
code = http.StatusUnprocessableEntity
case errorCanceled:
code = statusClientClosedConnection
case errorTimeout:
code = http.StatusServiceUnavailable
case errorInternal:
code = http.StatusInternalServerError
case errorNotFound:
code = http.StatusNotFound
case errorNotAcceptable:
code = http.StatusNotAcceptable
default:
code = http.StatusInternalServerError
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
if n, err := w.Write(b); err != nil {
level.Error(q.logger).Log("error writing response", "bytesWritten", n, "err", err)
}
}

func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interface{}, warnings annotations.Annotations, query string) {
warn, info := warnings.AsStrings(query, 10, 10)

Expand All @@ -221,7 +262,7 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interf

codec, err := q.negotiateCodec(req, resp)
if err != nil {
api.RespondFromGRPCError(q.logger, w, httpgrpc.Errorf(http.StatusNotAcceptable, "%s", &apiError{errorNotAcceptable, err}))
q.respondError(w, &apiError{errorNotAcceptable, err}, nil)
return
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package queryapi
package query

import (
"context"
Expand Down Expand Up @@ -63,7 +63,7 @@ func (mockQuerier) Close() error {
return nil
}

func Test_CustomAPI(t *testing.T) {
func Test_QueryAPI(t *testing.T) {
engine := promql.NewEngine(promql.EngineOpts{
MaxSamples: 100,
Timeout: time.Second * 2,
Expand Down Expand Up @@ -94,25 +94,25 @@ func Test_CustomAPI(t *testing.T) {
name: "[Range Query] empty start",
path: "/api/v1/query_range?end=1536673680&query=test&step=5",
expectedCode: http.StatusBadRequest,
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"start\\\"; cannot parse \\\"\\\" to a valid timestamp\"}",
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"start\\\": cannot parse \\\"\\\" to a valid timestamp\"}",
},
{
name: "[Range Query] empty end",
path: "/api/v1/query_range?query=test&start=1536673665&step=5",
expectedCode: http.StatusBadRequest,
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\"; cannot parse \\\"\\\" to a valid timestamp\"}",
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\": cannot parse \\\"\\\" to a valid timestamp\"}",
},
{
name: "[Range Query] start is greater than end",
path: "/api/v1/query_range?end=1536673680&query=test&start=1536673681&step=5",
expectedCode: http.StatusBadRequest,
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\"; end timestamp must not be before start time\"}",
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\": end timestamp must not be before start time\"}",
},
{
name: "[Range Query] negative step",
path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=-1",
expectedCode: http.StatusBadRequest,
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"step\\\"; zero or negative query resolution step widths are not accepted. Try a positive integer\"}",
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"step\\\": zero or negative query resolution step widths are not accepted. Try a positive integer\"}",
},
{
name: "[Range Query] returned points are over 11000",
Expand All @@ -124,19 +124,19 @@ func Test_CustomAPI(t *testing.T) {
name: "[Range Query] empty query",
path: "/api/v1/query_range?end=1536673680&start=1536673665&step=5",
expectedCode: http.StatusBadRequest,
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\"; unknown position: parse error: no expression found in input\"}",
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\": unknown position: parse error: no expression found in input\"}",
},
{
name: "[Range Query] invalid lookback delta",
path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5&lookback_delta=dummy",
expectedCode: http.StatusBadRequest,
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: rpc error: code = Code(400) desc = cannot parse \\\"dummy\\\" to a valid duration\"}",
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: cannot parse \\\"dummy\\\" to a valid duration\"}",
},
{
name: "[Range Query] invalid timeout delta",
path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5&timeout=dummy",
expectedCode: http.StatusBadRequest,
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\"; cannot parse \\\"dummy\\\" to a valid duration\"}",
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\": cannot parse \\\"dummy\\\" to a valid duration\"}",
},
{
name: "[Range Query] normal case",
Expand All @@ -148,19 +148,19 @@ func Test_CustomAPI(t *testing.T) {
name: "[Instant Query] empty query",
path: "/api/v1/query",
expectedCode: http.StatusBadRequest,
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\"; unknown position: parse error: no expression found in input\"}",
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\": unknown position: parse error: no expression found in input\"}",
},
{
name: "[Instant Query] invalid lookback delta",
path: "/api/v1/query?lookback_delta=dummy",
expectedCode: http.StatusBadRequest,
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: rpc error: code = Code(400) desc = cannot parse \\\"dummy\\\" to a valid duration\"}",
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: cannot parse \\\"dummy\\\" to a valid duration\"}",
},
{
name: "[Instant Query] invalid timeout",
path: "/api/v1/query?timeout=dummy",
expectedCode: http.StatusBadRequest,
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\"; cannot parse \\\"dummy\\\" to a valid duration\"}",
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\": cannot parse \\\"dummy\\\" to a valid duration\"}",
},
{
name: "[Instant Query] normal case",
Expand Down Expand Up @@ -243,7 +243,7 @@ func Test_InvalidCodec(t *testing.T) {
require.Equal(t, http.StatusNotAcceptable, rec.Code)
}

func Test_CustomAPI_StatsRenderer(t *testing.T) {
func Test_QueryAPI_StatsRenderer(t *testing.T) {
engine := promql.NewEngine(promql.EngineOpts{
MaxSamples: 100,
Timeout: time.Second * 2,
Expand Down
11 changes: 11 additions & 0 deletions pkg/api/query/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package query

// Response defines the Prometheus response format.
type Response struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we cannot reuse the one from util/api/response.go?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to use the original code.. The v1.ErrorType doesn't have a value like ErrNotFound and ErrInternal.

Status string `json:"status"`
Data interface{} `json:"data,omitempty"`
ErrorType errorType `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
Warnings []string `json:"warnings,omitempty"`
Infos []string `json:"infos,omitempty"`
}
Loading
Loading