Skip to content

Add dedicated instant/range query handlers #6763

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/api/queryapi"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/codec"
"github.com/cortexproject/cortex/pkg/querier/stats"
Expand Down Expand Up @@ -195,10 +196,13 @@ func NewQuerierHandler(
Help: "Current number of inflight requests to the querier.",
}, []string{"method", "route"})

statsRenderer := querier.StatsRenderer
corsOrigin := regexp.MustCompile(".*")
translateSampleAndChunkQueryable := querier.NewErrorTranslateSampleAndChunkQueryable(queryable)
api := v1.NewAPI(
engine,
querier.NewErrorTranslateSampleAndChunkQueryable(queryable), // Translate errors to errors expected by API.
nil, // No remote write support.
translateSampleAndChunkQueryable, // Translate errors to errors expected by API.
nil, // No remote write support.
exemplarQueryable,
func(ctx context.Context) v1.ScrapePoolsRetriever { return nil },
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
Expand All @@ -214,7 +218,7 @@ func NewQuerierHandler(
func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} },
0, 0, 0, // Remote read samples and concurrency limit.
false,
regexp.MustCompile(".*"),
corsOrigin,
func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") },
&v1.PrometheusVersion{
Version: version.Version,
Expand All @@ -229,7 +233,7 @@ func NewQuerierHandler(
// This is used for the stats API which we should not support. Or find other ways to.
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
reg,
querier.StatsRenderer,
statsRenderer,
false,
nil,
false,
Expand All @@ -240,11 +244,18 @@ func NewQuerierHandler(
api.ClearCodecs()
cm := codec.NewInstrumentedCodecMetrics(reg)

api.InstallCodec(codec.NewInstrumentedCodec(v1.JSONCodec{}, cm))
// Install Protobuf codec to give the option for using either.
api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm))
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm))
codecs := []v1.Codec{
codec.NewInstrumentedCodec(v1.JSONCodec{}, cm),
// Protobuf codec to give the option for using either.
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm),
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm),
}

// Install codecs
for _, c := range codecs {
api.InstallCodec(c)
}

router := mux.NewRouter()

Expand All @@ -269,13 +280,15 @@ func NewQuerierHandler(
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
api.Register(legacyPromRouter)

queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
Expand All @@ -287,8 +300,8 @@ func NewQuerierHandler(
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter)
Expand Down
249 changes: 249 additions & 0 deletions pkg/api/queryapi/query_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package queryapi

import (
"context"
"fmt"
"net/http"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/regexp"
"github.com/munnerz/goautoneg"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/httputil"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/api"
)

type QueryAPI struct {
queryable storage.SampleAndChunkQueryable
queryEngine promql.QueryEngine
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
codecs []v1.Codec
CORSOrigin *regexp.Regexp
}

func NewQueryAPI(
qe promql.QueryEngine,
q storage.SampleAndChunkQueryable,
statsRenderer v1.StatsRenderer,
logger log.Logger,
codecs []v1.Codec,
CORSOrigin *regexp.Regexp,
) *QueryAPI {
return &QueryAPI{
queryEngine: qe,
queryable: q,
statsRenderer: statsRenderer,
logger: logger,
codecs: codecs,
CORSOrigin: CORSOrigin,
now: time.Now,
}
}

func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
// TODO(Sungjin1212): Change to emit basic error (not gRPC)
start, err := util.ParseTime(r.FormValue("start"))
if err != nil {
return invalidParamError(err, "start")
}
end, err := util.ParseTime(r.FormValue("end"))
if err != nil {
return invalidParamError(err, "end")
}
if end < start {
return invalidParamError(ErrEndBeforeStart, "end")
}

step, err := util.ParseDurationMs(r.FormValue("step"))
if err != nil {
return invalidParamError(err, "step")
}

if step <= 0 {
return invalidParamError(ErrNegativeStep, "step")
}

// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (end-start)/step > 11000 {
return apiFuncResult{nil, &apiError{errorBadData, ErrStepTooSmall}, nil, nil}
}

ctx := r.Context()
if to := r.FormValue("timeout"); to != "" {
var cancel context.CancelFunc
timeout, err := util.ParseDurationMs(to)
if err != nil {
return invalidParamError(err, "timeout")
}

ctx, cancel = context.WithTimeout(ctx, convertMsToDuration(timeout))
defer cancel()
}

opts, err := extractQueryOpts(r)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
}
// From now on, we must only return with a finalizer in the result (to
// be called by the caller) or call qry.Close ourselves (which is
// required in the case of a panic).
defer func() {
if result.finalizer == nil {
qry.Close()
}
}()

ctx = httputil.ContextFromRequest(ctx, r)

res := qry.Exec(ctx)
if res.Err != nil {
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}

warnings := res.Warnings
qs := q.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))

return apiFuncResult{&v1.QueryData{
ResultType: res.Value.Type(),
Result: res.Value,
Stats: qs,
}, nil, warnings, qry.Close}
}

func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
// TODO(Sungjin1212): Change to emit basic error (not gRPC)
ts, err := util.ParseTimeParam(r, "time", q.now().Unix())
if err != nil {
return invalidParamError(err, "time")
}

ctx := r.Context()
if to := r.FormValue("timeout"); to != "" {
var cancel context.CancelFunc
timeout, err := util.ParseDurationMs(to)
if err != nil {
return invalidParamError(err, "timeout")
}

ctx, cancel = context.WithDeadline(ctx, q.now().Add(convertMsToDuration(timeout)))
defer cancel()
}

opts, err := extractQueryOpts(r)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
}

// From now on, we must only return with a finalizer in the result (to
// be called by the caller) or call qry.Close ourselves (which is
// required in the case of a panic).
defer func() {
if result.finalizer == nil {
qry.Close()
}
}()

ctx = httputil.ContextFromRequest(ctx, r)

res := qry.Exec(ctx)
if res.Err != nil {
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}

warnings := res.Warnings
qs := q.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))

return apiFuncResult{&v1.QueryData{
ResultType: res.Value.Type(),
Result: res.Value,
Stats: qs,
}, nil, warnings, qry.Close}
}

func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
httputil.SetCORS(w, q.CORSOrigin, r)

result := f(r)
if result.finalizer != nil {
defer result.finalizer()
}

if result.err != nil {
api.RespondFromGRPCError(q.logger, w, result.err.err)
return
}

if result.data != nil {
q.respond(w, r, result.data, result.warnings, r.FormValue("query"))
return
}
w.WriteHeader(http.StatusNoContent)
}
}

func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interface{}, warnings annotations.Annotations, query string) {
warn, info := warnings.AsStrings(query, 10, 10)

resp := &v1.Response{
Status: statusSuccess,
Data: data,
Warnings: warn,
Infos: info,
}

codec, err := q.negotiateCodec(req, resp)
if err != nil {
api.RespondFromGRPCError(q.logger, w, httpgrpc.Errorf(http.StatusNotAcceptable, "%s", &apiError{errorNotAcceptable, err}))
return
}

b, err := codec.Encode(resp)
if err != nil {
level.Error(q.logger).Log("error marshaling response", "url", req.URL, "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", codec.ContentType().String())
w.WriteHeader(http.StatusOK)
if n, err := w.Write(b); err != nil {
level.Error(q.logger).Log("error writing response", "url", req.URL, "bytesWritten", n, "err", err)
}
}

func (q *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) {
for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) {
for _, codec := range q.codecs {
if codec.ContentType().Satisfies(clause) && codec.CanEncode(resp) {
return codec, nil
}
}
}

defaultCodec := q.codecs[0]
if !defaultCodec.CanEncode(resp) {
return nil, fmt.Errorf("cannot encode response as %s", defaultCodec.ContentType())
}

return defaultCodec, nil
}
Loading
Loading