Skip to content

feat: instrument the query layer to track rate-limited queries #3894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
* `-alertmanager.alertmanager-client.tls-server-name`
* `-alertmanager.alertmanager-client.tls-insecure-skip-verify`
* [FEATURE] Compactor: added blocks storage per-tenant retention support. This is configured via `-compactor.retention-period`, and can be overridden on a per-tenant basis. #3879
* [ENHANCEMENT] Queries: Instrument queries that were discarded due to the configured `max_outstanding_requests_per_tenant`. #3894
* `cortex_query_frontend_discarded_requests_total`
* `cortex_query_scheduler_discarded_requests_total`
* [ENHANCEMENT] Ruler: Add TLS and explicit basis authentication configuration options for the HTTP client the ruler uses to communicate with the alertmanager. #3752
* `-ruler.alertmanager-client.basic-auth-username`: Configure the basic authentication username used by the client. Takes precedent over a URL configured username.
* `-ruler.alertmanager-client.basic-auth-password`: Configure the basic authentication password used by the client. Takes precedent over a URL configured password.
Expand Down
14 changes: 10 additions & 4 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ type Frontend struct {
activeUsers *util.ActiveUsersCleanupService

// Metrics.
queueLength *prometheus.GaugeVec
numClients prometheus.GaugeFunc
queueDuration prometheus.Histogram
queueLength *prometheus.GaugeVec
discardedRequests *prometheus.CounterVec
numClients prometheus.GaugeFunc
queueDuration prometheus.Histogram
}

type request struct {
Expand All @@ -83,14 +84,18 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
Name: "cortex_query_frontend_queue_length",
Help: "Number of queries in the queue.",
}, []string{"user"}),
discardedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"}),
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_frontend_queue_duration_seconds",
Help: "Time spend by requests queued.",
Buckets: prometheus.DefBuckets,
}),
}

f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, f.queueLength)
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, f.queueLength, f.discardedRequests)
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)

f.numClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Expand All @@ -114,6 +119,7 @@ func (f *Frontend) stopping(_ error) error {

func (f *Frontend) cleanupInactiveUserMetrics(user string) {
f.queueLength.DeleteLabelValues(user)
f.discardedRequests.DeleteLabelValues(user)
}

// RoundTripGRPC round trips a proto (instead of a HTTP request).
Expand Down
7 changes: 5 additions & 2 deletions pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,11 @@ func TestFrontendCheckReady(t *testing.T) {
} {
t.Run(tt.name, func(t *testing.T) {
f := &Frontend{
log: log.NewNopLogger(),
requestQueue: queue.NewRequestQueue(5, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"})),
log: log.NewNopLogger(),
requestQueue: queue.NewRequestQueue(5,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
),
}
for i := 0; i < tt.connectedClients; i++ {
f.requestQueue.RegisterQuerierConnection("test")
Expand Down
7 changes: 5 additions & 2 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@ type RequestQueue struct {
queues *queues
stopped bool

queueLength *prometheus.GaugeVec // Per user.
queueLength *prometheus.GaugeVec // Per user and reason.
discardedRequests *prometheus.CounterVec // Per user.
}

func NewRequestQueue(maxOutstandingPerTenant int, queueLength *prometheus.GaugeVec) *RequestQueue {
func NewRequestQueue(maxOutstandingPerTenant int, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
q := &RequestQueue{
queues: newUserQueues(maxOutstandingPerTenant),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
}

q.cond = sync.NewCond(&q.mtx)
Expand Down Expand Up @@ -91,6 +93,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
}
return nil
default:
q.discardedRequests.WithLabelValues(userID).Inc()
return ErrTooManyRequests
}
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ func BenchmarkGetNextRequest(b *testing.B) {
queues := make([]*RequestQueue, 0, b.N)

for n := 0; n < b.N; n++ {
queue := NewRequestQueue(maxOutstandingPerTenant, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}))
queue := NewRequestQueue(maxOutstandingPerTenant,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)
queues = append(queues, queue)

for ix := 0; ix < queriers; ix++ {
Expand Down Expand Up @@ -71,7 +74,10 @@ func BenchmarkQueueRequest(b *testing.B) {
requests := make([]string, 0, numTenants)

for n := 0; n < b.N; n++ {
q := NewRequestQueue(maxOutstandingPerTenant, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}))
q := NewRequestQueue(maxOutstandingPerTenant,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)

for ix := 0; ix < queriers; ix++ {
q.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix))
Expand Down
9 changes: 8 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Scheduler struct {

// Metrics.
queueLength *prometheus.GaugeVec
discardedRequests *prometheus.CounterVec
connectedQuerierClients prometheus.GaugeFunc
connectedFrontendClients prometheus.GaugeFunc
queueDuration prometheus.Histogram
Expand Down Expand Up @@ -100,7 +101,12 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
Name: "cortex_query_scheduler_queue_length",
Help: "Number of queries in the queue.",
}, []string{"user"})
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, s.queueLength)

s.discardedRequests = promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_scheduler_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"})
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, s.queueLength, s.discardedRequests)

s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_scheduler_queue_duration_seconds",
Expand Down Expand Up @@ -471,6 +477,7 @@ func (s *Scheduler) stopping(_ error) error {

func (s *Scheduler) cleanupMetricsForInactiveUser(user string) {
s.queueLength.DeleteLabelValues(user)
s.discardedRequests.DeleteLabelValues(user)
}

func (s *Scheduler) getConnectedFrontendClientsMetric() float64 {
Expand Down