Skip to content

Commit 721af3a

Browse files
committed
Add dedicated instant/range query handlers
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 7046357 commit 721af3a

File tree

5 files changed

+713
-11
lines changed

5 files changed

+713
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* [FEATURE] Experimental Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet and Parquet Queryable. #6716 #6743
1515
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
1616
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
17+
* [FEATURE] Querier: Add dedicated instant/range query handlers to customize these APIs. #6763
1718
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
1819
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. Add a `-ingester.return-all-metadata` flag to make the metadata API run when the deployment. Please set this flag to `false` to use the metadata API with the limits later. #6681 #6744
1920
* [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695

pkg/api/custom_api.go

Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
package api
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net/http"
8+
"path"
9+
"time"
10+
11+
"github.com/go-kit/log"
12+
"github.com/go-kit/log/level"
13+
"github.com/gorilla/mux"
14+
"github.com/grafana/regexp"
15+
jsoniter "github.com/json-iterator/go"
16+
"github.com/munnerz/goautoneg"
17+
"github.com/prometheus/prometheus/promql"
18+
"github.com/prometheus/prometheus/storage"
19+
"github.com/prometheus/prometheus/util/annotations"
20+
"github.com/prometheus/prometheus/util/httputil"
21+
v1 "github.com/prometheus/prometheus/web/api/v1"
22+
)
23+
24+
type CustomAPI struct {
25+
queryable storage.SampleAndChunkQueryable
26+
queryEngine promql.QueryEngine
27+
now func() time.Time
28+
statsRenderer v1.StatsRenderer
29+
logger log.Logger
30+
codecs []v1.Codec
31+
CORSOrigin *regexp.Regexp
32+
}
33+
34+
func NewCustomAPI(
35+
qe promql.QueryEngine,
36+
q storage.SampleAndChunkQueryable,
37+
statsRenderer v1.StatsRenderer,
38+
logger log.Logger,
39+
codecs []v1.Codec,
40+
CORSOrigin *regexp.Regexp,
41+
) *CustomAPI {
42+
return &CustomAPI{
43+
queryable: q,
44+
queryEngine: qe,
45+
now: time.Now,
46+
statsRenderer: statsRenderer,
47+
logger: logger,
48+
codecs: codecs,
49+
CORSOrigin: CORSOrigin,
50+
}
51+
}
52+
53+
// Register custom api path with handler
54+
// Usage: router.Path(path.Join(prefix, ${path})).Methods(${method}).Handler(${handler})
55+
func (c *CustomAPI) Register(router *mux.Router, prefix string) {
56+
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(c.wrap(c.instantHandler))
57+
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(c.wrap(c.rangeQueryHandler))
58+
}
59+
60+
// Custom handler for Query range API
61+
func (c *CustomAPI) rangeQueryHandler(r *http.Request) (result apiFuncResult) {
62+
start, err := parseTime(r.FormValue("start"))
63+
if err != nil {
64+
return invalidParamError(err, "start")
65+
}
66+
end, err := parseTime(r.FormValue("end"))
67+
if err != nil {
68+
return invalidParamError(err, "end")
69+
}
70+
if end.Before(start) {
71+
return invalidParamError(errors.New("end timestamp must not be before start time"), "end")
72+
}
73+
74+
step, err := parseDuration(r.FormValue("step"))
75+
if err != nil {
76+
return invalidParamError(err, "step")
77+
}
78+
79+
if step <= 0 {
80+
return invalidParamError(errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer"), "step")
81+
}
82+
83+
// For safety, limit the number of returned points per timeseries.
84+
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
85+
if end.Sub(start)/step > 11000 {
86+
err := errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
87+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
88+
}
89+
90+
ctx := r.Context()
91+
if to := r.FormValue("timeout"); to != "" {
92+
var cancel context.CancelFunc
93+
timeout, err := parseDuration(to)
94+
if err != nil {
95+
return invalidParamError(err, "timeout")
96+
}
97+
98+
ctx, cancel = context.WithTimeout(ctx, timeout)
99+
defer cancel()
100+
}
101+
102+
opts, err := extractQueryOpts(r)
103+
if err != nil {
104+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
105+
}
106+
qry, err := c.queryEngine.NewRangeQuery(ctx, c.queryable, opts, r.FormValue("query"), start, end, step)
107+
if err != nil {
108+
return invalidParamError(err, "query")
109+
}
110+
// From now on, we must only return with a finalizer in the result (to
111+
// be called by the caller) or call qry.Close ourselves (which is
112+
// required in the case of a panic).
113+
defer func() {
114+
if result.finalizer == nil {
115+
qry.Close()
116+
}
117+
}()
118+
119+
ctx = httputil.ContextFromRequest(ctx, r)
120+
121+
res := qry.Exec(ctx)
122+
if res.Err != nil {
123+
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
124+
}
125+
126+
warnings := res.Warnings
127+
qs := c.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))
128+
129+
return apiFuncResult{&v1.QueryData{
130+
ResultType: res.Value.Type(),
131+
Result: res.Value,
132+
Stats: qs,
133+
}, nil, warnings, qry.Close}
134+
}
135+
136+
// Custom handler for Query API
137+
func (c *CustomAPI) instantHandler(r *http.Request) (result apiFuncResult) {
138+
ts, err := parseTimeParam(r, "time", c.now())
139+
if err != nil {
140+
return invalidParamError(err, "time")
141+
}
142+
143+
ctx := r.Context()
144+
if to := r.FormValue("timeout"); to != "" {
145+
var cancel context.CancelFunc
146+
timeout, err := parseDuration(to)
147+
if err != nil {
148+
return invalidParamError(err, "timeout")
149+
}
150+
151+
ctx, cancel = context.WithDeadline(ctx, c.now().Add(timeout))
152+
defer cancel()
153+
}
154+
155+
opts, err := extractQueryOpts(r)
156+
if err != nil {
157+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
158+
}
159+
qry, err := c.queryEngine.NewInstantQuery(ctx, c.queryable, opts, r.FormValue("query"), ts)
160+
if err != nil {
161+
return invalidParamError(err, "query")
162+
}
163+
164+
// From now on, we must only return with a finalizer in the result (to
165+
// be called by the caller) or call qry.Close ourselves (which is
166+
// required in the case of a panic).
167+
defer func() {
168+
if result.finalizer == nil {
169+
qry.Close()
170+
}
171+
}()
172+
173+
ctx = httputil.ContextFromRequest(ctx, r)
174+
175+
res := qry.Exec(ctx)
176+
if res.Err != nil {
177+
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
178+
}
179+
180+
warnings := res.Warnings
181+
qs := c.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))
182+
183+
return apiFuncResult{&v1.QueryData{
184+
ResultType: res.Value.Type(),
185+
Result: res.Value,
186+
Stats: qs,
187+
}, nil, warnings, qry.Close}
188+
}
189+
190+
func (c *CustomAPI) wrap(f apiFunc) http.HandlerFunc {
191+
return func(w http.ResponseWriter, r *http.Request) {
192+
httputil.SetCORS(w, c.CORSOrigin, r)
193+
194+
result := f(r)
195+
if result.finalizer != nil {
196+
defer result.finalizer()
197+
}
198+
199+
if result.err != nil {
200+
c.respondError(w, result.err, result.data)
201+
return
202+
}
203+
204+
if result.data != nil {
205+
c.respond(w, r, result.data, result.warnings, r.FormValue("query"))
206+
return
207+
}
208+
w.WriteHeader(http.StatusNoContent)
209+
}
210+
}
211+
212+
func (c *CustomAPI) respondError(w http.ResponseWriter, apiErr *apiError, data interface{}) {
213+
json := jsoniter.ConfigCompatibleWithStandardLibrary
214+
b, err := json.Marshal(&response{
215+
Status: statusError,
216+
ErrorType: apiErr.typ,
217+
Error: apiErr.err.Error(),
218+
Data: data,
219+
})
220+
if err != nil {
221+
level.Error(c.logger).Log("error marshaling json response", "err", err)
222+
http.Error(w, err.Error(), http.StatusInternalServerError)
223+
return
224+
}
225+
226+
var code int
227+
switch apiErr.typ {
228+
case errorBadData:
229+
code = http.StatusBadRequest
230+
case errorExec:
231+
code = http.StatusUnprocessableEntity
232+
case errorCanceled:
233+
code = statusClientClosedConnection
234+
case errorTimeout:
235+
code = http.StatusServiceUnavailable
236+
case errorInternal:
237+
code = http.StatusInternalServerError
238+
case errorNotFound:
239+
code = http.StatusNotFound
240+
case errorNotAcceptable:
241+
code = http.StatusNotAcceptable
242+
default:
243+
code = http.StatusInternalServerError
244+
}
245+
246+
w.Header().Set("Content-Type", "application/json")
247+
w.WriteHeader(code)
248+
if n, err := w.Write(b); err != nil {
249+
level.Error(c.logger).Log("error writing response", "bytesWritten", n, "err", err)
250+
}
251+
}
252+
253+
func (c *CustomAPI) respond(w http.ResponseWriter, req *http.Request, data interface{}, warnings annotations.Annotations, query string) {
254+
warn, info := warnings.AsStrings(query, 10, 10)
255+
256+
resp := &v1.Response{
257+
Status: statusSuccess,
258+
Data: data,
259+
Warnings: warn,
260+
Infos: info,
261+
}
262+
263+
codec, err := c.negotiateCodec(req, resp)
264+
if err != nil {
265+
c.respondError(w, &apiError{errorNotAcceptable, err}, nil)
266+
return
267+
}
268+
269+
b, err := codec.Encode(resp)
270+
if err != nil {
271+
level.Error(c.logger).Log("error marshaling response", "url", req.URL, "err", err)
272+
http.Error(w, err.Error(), http.StatusInternalServerError)
273+
return
274+
}
275+
276+
w.Header().Set("Content-Type", codec.ContentType().String())
277+
w.WriteHeader(http.StatusOK)
278+
if n, err := w.Write(b); err != nil {
279+
level.Error(c.logger).Log("error writing response", "url", req.URL, "bytesWritten", n, "err", err)
280+
}
281+
}
282+
283+
func (c *CustomAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) {
284+
for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) {
285+
for _, codec := range c.codecs {
286+
if codec.ContentType().Satisfies(clause) && codec.CanEncode(resp) {
287+
return codec, nil
288+
}
289+
}
290+
}
291+
292+
defaultCodec := c.codecs[0]
293+
if !defaultCodec.CanEncode(resp) {
294+
return nil, fmt.Errorf("cannot encode response as %s", defaultCodec.ContentType())
295+
}
296+
297+
return defaultCodec, nil
298+
}

0 commit comments

Comments
 (0)