Skip to content

Commit b05ba9b

Browse files
committed
Add a source label to query stat metrics
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent c2c4827 commit b05ba9b

File tree

5 files changed

+47
-36
lines changed

5 files changed

+47
-36
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
* [FEATURE] Ruler: Add support for per-user external labels #6340
2222
* [FEATURE] Query Frontend: Support an exemplar federated query when `-tenant-federation.enabled=true`. #6455
2323
* [FEATURE] Ingester: Add support for cache query matchers via `-ingester.matchers-cache-max-items. #6477
24+
* [ENHANCEMENT] Query Frontend: Add a `source` label to query stat metrics. #6470
2425
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
2526
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
2627
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388

integration/ruler_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1673,7 +1673,8 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) {
16731673
for _, format := range []string{"protobuf", "json"} {
16741674
t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) {
16751675
queryFrontendFlag := mergeFlags(flags, map[string]string{
1676-
"-ruler.query-response-format": format,
1676+
"-ruler.query-response-format": format,
1677+
"-frontend.query-stats-enabled": "true",
16771678
})
16781679
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "")
16791680
require.NoError(t, s.Start(queryFrontend))
@@ -1726,6 +1727,14 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) {
17261727
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
17271728
// Check that cortex_query_frontend_queries_total went up
17281729
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
1730+
// check query stat metrics
1731+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_seconds_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
1732+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_fetched_series_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
1733+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_samples_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
1734+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_samples_scanned_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
1735+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_peak_samples"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
1736+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_fetched_chunks_bytes_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
1737+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_fetched_data_bytes_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
17291738
})
17301739
}
17311740
}

pkg/frontend/transport/handler.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -112,48 +112,48 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
112112
h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
113113
Name: "cortex_query_seconds_total",
114114
Help: "Total amount of wall clock time spend processing queries.",
115-
}, []string{"user"})
115+
}, []string{"source", "user"})
116116

117117
h.queryFetchedSeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
118118
Name: "cortex_query_fetched_series_total",
119119
Help: "Number of series fetched to execute a query.",
120-
}, []string{"user"})
120+
}, []string{"source", "user"})
121121

122122
h.queryFetchedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
123123
Name: "cortex_query_samples_total",
124124
Help: "Number of samples fetched to execute a query.",
125-
}, []string{"user"})
125+
}, []string{"source", "user"})
126126

127127
// It tracks TotalSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go#L237 for each user.
128128
h.queryScannedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
129129
Name: "cortex_query_samples_scanned_total",
130130
Help: "Number of samples scanned to execute a query.",
131-
}, []string{"user"})
131+
}, []string{"source", "user"})
132132

133133
h.queryPeakSamples = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
134134
Name: "cortex_query_peak_samples",
135135
Help: "Highest count of samples considered to execute a query.",
136136
NativeHistogramBucketFactor: 1.1,
137137
NativeHistogramMaxBucketNumber: 100,
138138
NativeHistogramMinResetDuration: 1 * time.Hour,
139-
}, []string{"user"})
139+
}, []string{"source", "user"})
140140

141141
h.queryChunkBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
142142
Name: "cortex_query_fetched_chunks_bytes_total",
143143
Help: "Size of all chunks fetched to execute a query in bytes.",
144-
}, []string{"user"})
144+
}, []string{"source", "user"})
145145

146146
h.queryDataBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
147147
Name: "cortex_query_fetched_data_bytes_total",
148148
Help: "Size of all data fetched to execute a query in bytes.",
149-
}, []string{"user"})
149+
}, []string{"source", "user"})
150150

151151
h.rejectedQueries = promauto.With(reg).NewCounterVec(
152152
prometheus.CounterOpts{
153153
Name: "cortex_rejected_queries_total",
154154
Help: "The total number of queries that were rejected.",
155155
},
156-
[]string{"reason", "user"},
156+
[]string{"reason", "source", "user"},
157157
)
158158

159159
h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
@@ -218,7 +218,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
218218
}
219219
http.Error(w, err.Error(), statusCode)
220220
if f.cfg.QueryStatsEnabled && util.IsRequestBodyTooLarge(err) {
221-
f.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, userID).Inc()
221+
source := tripperware.GetSource(r.Header.Get("User-Agent"))
222+
f.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, source, userID).Inc()
222223
}
223224
return
224225
}
@@ -261,7 +262,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
261262
}
262263
}
263264

264-
f.reportQueryStats(r, userID, queryString, queryResponseTime, stats, err, statusCode, resp)
265+
source := tripperware.GetSource(r.Header.Get("User-Agent"))
266+
f.reportQueryStats(r, source, userID, queryString, queryResponseTime, stats, err, statusCode, resp)
265267
}
266268

267269
hs := w.Header()
@@ -335,7 +337,7 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query
335337
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
336338
}
337339

338-
func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) {
340+
func (f *Handler) reportQueryStats(r *http.Request, source, userID string, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) {
339341
wallTime := stats.LoadWallTime()
340342
queryStorageWallTime := stats.LoadQueryStorageWallTime()
341343
numResponseSeries := stats.LoadResponseSeries()
@@ -353,13 +355,13 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
353355
dataSelectMinTime := stats.LoadDataSelectMinTime()
354356

355357
// Track stats.
356-
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
357-
f.queryFetchedSeries.WithLabelValues(userID).Add(float64(numFetchedSeries))
358-
f.queryFetchedSamples.WithLabelValues(userID).Add(float64(numFetchedSamples))
359-
f.queryScannedSamples.WithLabelValues(userID).Add(float64(numScannedSamples))
360-
f.queryPeakSamples.WithLabelValues(userID).Observe(float64(numPeakSamples))
361-
f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes))
362-
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
358+
f.querySeconds.WithLabelValues(source, userID).Add(wallTime.Seconds())
359+
f.queryFetchedSeries.WithLabelValues(source, userID).Add(float64(numFetchedSeries))
360+
f.queryFetchedSamples.WithLabelValues(source, userID).Add(float64(numFetchedSamples))
361+
f.queryScannedSamples.WithLabelValues(source, userID).Add(float64(numScannedSamples))
362+
f.queryPeakSamples.WithLabelValues(source, userID).Observe(float64(numPeakSamples))
363+
f.queryChunkBytes.WithLabelValues(source, userID).Add(float64(numChunkBytes))
364+
f.queryDataBytes.WithLabelValues(source, userID).Add(float64(numDataBytes))
363365
f.activeUsers.UpdateUserTimestamp(userID, time.Now())
364366

365367
var (
@@ -468,7 +470,7 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
468470
}
469471
}
470472
if len(reason) > 0 {
471-
f.rejectedQueries.WithLabelValues(reason, userID).Inc()
473+
f.rejectedQueries.WithLabelValues(reason, source, userID).Inc()
472474
stats.LimitHit = reason
473475
}
474476
}

pkg/frontend/transport/handler_test.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"google.golang.org/grpc/codes"
2525

2626
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
27+
"github.com/cortexproject/cortex/pkg/querier/tripperware"
2728
util_api "github.com/cortexproject/cortex/pkg/util/api"
2829
util_log "github.com/cortexproject/cortex/pkg/util/log"
2930
)
@@ -210,7 +211,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
210211
}, nil
211212
}),
212213
additionalMetricsCheckFunc: func(h *Handler) {
213-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResponseBodySizeExceeded, userID))
214+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResponseBodySizeExceeded, tripperware.SourceAPI, userID))
214215
assert.Equal(t, float64(1), v)
215216
},
216217
expectedStatusCode: http.StatusRequestEntityTooLarge,
@@ -226,7 +227,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
226227
}, nil
227228
}),
228229
additionalMetricsCheckFunc: func(h *Handler) {
229-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, userID))
230+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, tripperware.SourceAPI, userID))
230231
assert.Equal(t, float64(1), v)
231232
},
232233
expectedStatusCode: http.StatusTooManyRequests,
@@ -242,7 +243,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
242243
}, nil
243244
}),
244245
additionalMetricsCheckFunc: func(h *Handler) {
245-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, userID))
246+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, tripperware.SourceAPI, userID))
246247
assert.Equal(t, float64(1), v)
247248
},
248249
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -258,7 +259,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
258259
}, nil
259260
}),
260261
additionalMetricsCheckFunc: func(h *Handler) {
261-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, userID))
262+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, tripperware.SourceAPI, userID))
262263
assert.Equal(t, float64(1), v)
263264
},
264265
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -274,7 +275,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
274275
}, nil
275276
}),
276277
additionalMetricsCheckFunc: func(h *Handler) {
277-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, userID))
278+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, tripperware.SourceAPI, userID))
278279
assert.Equal(t, float64(1), v)
279280
},
280281
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -290,7 +291,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
290291
}, nil
291292
}),
292293
additionalMetricsCheckFunc: func(h *Handler) {
293-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, userID))
294+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, tripperware.SourceAPI, userID))
294295
assert.Equal(t, float64(1), v)
295296
},
296297
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -306,7 +307,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
306307
}, nil
307308
}),
308309
additionalMetricsCheckFunc: func(h *Handler) {
309-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, userID))
310+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, tripperware.SourceAPI, userID))
310311
assert.Equal(t, float64(1), v)
311312
},
312313
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -322,7 +323,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
322323
}, nil
323324
}),
324325
additionalMetricsCheckFunc: func(h *Handler) {
325-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, userID))
326+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, tripperware.SourceAPI, userID))
326327
assert.Equal(t, float64(1), v)
327328
},
328329
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -338,7 +339,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
338339
}, nil
339340
}),
340341
additionalMetricsCheckFunc: func(h *Handler) {
341-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, userID))
342+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, tripperware.SourceAPI, userID))
342343
assert.Equal(t, float64(1), v)
343344
},
344345
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -354,7 +355,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
354355
}, nil
355356
}),
356357
additionalMetricsCheckFunc: func(h *Handler) {
357-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, userID))
358+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, tripperware.SourceAPI, userID))
358359
assert.Equal(t, float64(1), v)
359360
},
360361
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -370,7 +371,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
370371
}, nil
371372
}),
372373
additionalMetricsCheckFunc: func(h *Handler) {
373-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, userID))
374+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, tripperware.SourceAPI, userID))
374375
assert.Equal(t, float64(1), v)
375376
},
376377
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -498,7 +499,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
498499
for testName, testData := range tests {
499500
t.Run(testName, func(t *testing.T) {
500501
req.Header = testData.header
501-
handler.reportQueryStats(req, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp)
502+
handler.reportQueryStats(req, tripperware.SourceAPI, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp)
502503
data, err := io.ReadAll(outputBuf)
503504
require.NoError(t, err)
504505
require.Equal(t, testData.expectedLog+"\n", string(data))

pkg/querier/tripperware/roundtrip.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package tripperware
1717

1818
import (
1919
"context"
20-
"fmt"
2120
"io"
2221
"net/http"
2322
"strings"
@@ -157,7 +156,7 @@ func NewQueryTripperware(
157156
now := time.Now()
158157
userStr := tenant.JoinTenantIDs(tenantIDs)
159158
activeUsers.UpdateUserTimestamp(userStr, now)
160-
source := getSource(r.Header.Get("User-Agent"))
159+
source := GetSource(r.Header.Get("User-Agent"))
161160
queriesPerTenant.WithLabelValues(op, source, userStr).Inc()
162161

163162
if maxSubQuerySteps > 0 && (isQuery || isQueryRange) {
@@ -243,8 +242,7 @@ func (q roundTripper) Do(ctx context.Context, r Request) (Response, error) {
243242
return q.codec.DecodeResponse(ctx, response, r)
244243
}
245244

246-
func getSource(userAgent string) string {
247-
fmt.Println("userAgent", userAgent)
245+
func GetSource(userAgent string) string {
248246
if strings.Contains(userAgent, RulerUserAgent) {
249247
// caller is ruler
250248
return SourceRuler

0 commit comments

Comments
 (0)