@@ -30,7 +30,7 @@ import (
30
30
)
31
31
32
32
const (
33
- // StatusClientClosedRequest is the status code for when a client request cancellation of an http request
33
+ // StatusClientClosedRequest is the status code for when a client request cancellation of a http request
34
34
StatusClientClosedRequest = 499
35
35
ServiceTimingHeaderName = "Server-Timing"
36
36
)
41
41
errRequestEntityTooLarge = httpgrpc .Errorf (http .StatusRequestEntityTooLarge , "http: request body too large" )
42
42
)
43
43
44
+ const (
45
+ reasonRequestBodySizeExceeded = "request_body_size_exceeded"
46
+ reasonResponseBodySizeExceeded = "response_body_size_exceeded"
47
+ reasonTooManyRequests = "too_many_requests"
48
+ reasonTimeRangeExceeded = "time_range_exceeded"
49
+ reasonTooManySamples = "too_many_samples"
50
+ reasonSeriesFetched = "series_fetched"
51
+ reasonChunksFetched = "chunks_fetched"
52
+ reasonChunkBytesFetched = "chunk_bytes_fetched"
53
+ reasonDataBytesFetched = "data_bytes_fetched"
54
+ reasonSeriesLimitStoreGateway = "store_gateway_series_limit"
55
+ reasonChunksLimitStoreGateway = "store_gateway_chunks_limit"
56
+ reasonBytesLimitStoreGateway = "store_gateway_bytes_limit"
57
+
58
+ limitTooManySamples = `query processing would load too many samples into memory`
59
+ limitTimeRangeExceeded = `the query time range exceeds the limit`
60
+ limitSeriesFetched = `the query hit the max number of series limit`
61
+ limitChunksFetched = `the query hit the max number of chunks limit`
62
+ limitChunkBytesFetched = `the query hit the aggregated chunks size limit`
63
+ limitDataBytesFetched = `the query hit the aggregated data size limit`
64
+
65
+ // Store gateway limits.
66
+ limitSeriesStoreGateway = `exceeded series limit`
67
+ limitChunksStoreGateway = `exceeded chunks limit`
68
+ limitBytesStoreGateway = `exceeded bytes limit`
69
+ )
70
+
44
71
// Config for a Handler.
45
72
type HandlerConfig struct {
46
73
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
@@ -67,6 +94,7 @@ type Handler struct {
67
94
querySeries * prometheus.CounterVec
68
95
queryChunkBytes * prometheus.CounterVec
69
96
queryDataBytes * prometheus.CounterVec
97
+ rejectedQueries * prometheus.CounterVec
70
98
activeUsers * util.ActiveUsersCleanupService
71
99
}
72
100
@@ -104,12 +132,23 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
104
132
Help : "Size of all data fetched to execute a query in bytes." ,
105
133
}, []string {"user" })
106
134
135
+ h .rejectedQueries = prometheus .NewCounterVec (
136
+ prometheus.CounterOpts {
137
+ Name : "cortex_rejected_queries_total" ,
138
+ Help : "The total number of queries that were rejected." ,
139
+ },
140
+ []string {"reason" , "user" },
141
+ )
142
+
107
143
h .activeUsers = util .NewActiveUsersCleanupWithDefaultValues (func (user string ) {
108
144
h .queriesCount .DeleteLabelValues (user )
109
145
h .querySeconds .DeleteLabelValues (user )
110
146
h .querySeries .DeleteLabelValues (user )
111
147
h .queryChunkBytes .DeleteLabelValues (user )
112
148
h .queryDataBytes .DeleteLabelValues (user )
149
+ if err := util .DeleteMatchingLabels (h .rejectedQueries , map [string ]string {"user" : user }); err != nil {
150
+ level .Warn (log ).Log ("msg" , "failed to remove cortex_rejected_queries_total metric for user" , "user" , user , "err" , err )
151
+ }
113
152
})
114
153
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
115
154
_ = h .activeUsers .StartAsync (context .Background ())
@@ -124,6 +163,12 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
124
163
queryString url.Values
125
164
)
126
165
166
+ tenantIDs , err := tenant .TenantIDs (r .Context ())
167
+ if err != nil {
168
+ return
169
+ }
170
+ userID := tenant .JoinTenantIDs (tenantIDs )
171
+
127
172
// Initialise the stats in the context and make sure it's propagated
128
173
// down the request chain.
129
174
if f .cfg .QueryStatsEnabled {
@@ -150,6 +195,9 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
150
195
if ! strings .Contains (r .URL .Path , "api/v1/read" ) {
151
196
if err := r .ParseForm (); err != nil {
152
197
writeError (w , err )
198
+ if f .cfg .QueryStatsEnabled && util .IsRequestBodyTooLarge (err ) {
199
+ f .rejectedQueries .WithLabelValues (reasonRequestBodySizeExceeded , userID ).Inc ()
200
+ }
153
201
return
154
202
}
155
203
r .Body = io .NopCloser (& buf )
@@ -168,7 +216,9 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
168
216
if shouldReportSlowQuery {
169
217
f .reportSlowQuery (r , queryString , queryResponseTime )
170
218
}
219
+
171
220
if f .cfg .QueryStatsEnabled {
221
+ // Try to parse error and get status code.
172
222
var statusCode int
173
223
if err != nil {
174
224
statusCode = getStatusCodeFromError (err )
@@ -184,7 +234,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
184
234
}
185
235
}
186
236
187
- f .reportQueryStats (r , queryString , queryResponseTime , stats , err , statusCode , resp )
237
+ f .reportQueryStats (r , userID , queryString , queryResponseTime , stats , err , statusCode , resp )
188
238
}
189
239
190
240
if err != nil {
@@ -239,12 +289,7 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query
239
289
level .Info (util_log .WithContext (r .Context (), f .log )).Log (logMessage ... )
240
290
}
241
291
242
- func (f * Handler ) reportQueryStats (r * http.Request , queryString url.Values , queryResponseTime time.Duration , stats * querier_stats.QueryStats , error error , statusCode int , resp * http.Response ) {
243
- tenantIDs , err := tenant .TenantIDs (r .Context ())
244
- if err != nil {
245
- return
246
- }
247
- userID := tenant .JoinTenantIDs (tenantIDs )
292
+ func (f * Handler ) reportQueryStats (r * http.Request , userID string , queryString url.Values , queryResponseTime time.Duration , stats * querier_stats.QueryStats , error error , statusCode int , resp * http.Response ) {
248
293
wallTime := stats .LoadWallTime ()
249
294
numSeries := stats .LoadFetchedSeries ()
250
295
numChunks := stats .LoadFetchedChunks ()
@@ -311,6 +356,38 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
311
356
} else {
312
357
level .Info (util_log .WithContext (r .Context (), f .log )).Log (logMessage ... )
313
358
}
359
+
360
+ var reason string
361
+ if statusCode == http .StatusTooManyRequests {
362
+ reason = reasonTooManyRequests
363
+ } else if statusCode == http .StatusRequestEntityTooLarge {
364
+ reason = reasonResponseBodySizeExceeded
365
+ } else if statusCode == http .StatusUnprocessableEntity {
366
+ errMsg := error .Error ()
367
+ if strings .Contains (errMsg , limitTooManySamples ) {
368
+ reason = reasonTooManySamples
369
+ } else if strings .Contains (errMsg , limitTimeRangeExceeded ) {
370
+ reason = reasonTimeRangeExceeded
371
+ } else if strings .Contains (errMsg , limitSeriesFetched ) {
372
+ reason = reasonSeriesFetched
373
+ } else if strings .Contains (errMsg , limitChunksFetched ) {
374
+ reason = reasonChunksFetched
375
+ } else if strings .Contains (errMsg , limitChunkBytesFetched ) {
376
+ reason = reasonChunkBytesFetched
377
+ } else if strings .Contains (errMsg , limitDataBytesFetched ) {
378
+ reason = reasonDataBytesFetched
379
+ } else if strings .Contains (errMsg , limitSeriesStoreGateway ) {
380
+ reason = reasonSeriesLimitStoreGateway
381
+ } else if strings .Contains (errMsg , limitChunksStoreGateway ) {
382
+ reason = reasonChunksLimitStoreGateway
383
+ } else if strings .Contains (errMsg , limitBytesStoreGateway ) {
384
+ reason = reasonBytesLimitStoreGateway
385
+ }
386
+ }
387
+ if len (reason ) > 0 {
388
+ f .rejectedQueries .WithLabelValues (reason , userID ).Inc ()
389
+ stats .LimitHit = reason
390
+ }
314
391
}
315
392
316
393
func (f * Handler ) parseRequestQueryString (r * http.Request , bodyBuf bytes.Buffer ) url.Values {
0 commit comments