Skip to content

Re-enqueue 429 requests if there are multiple query-schedulers #5496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [FEATURE] Store Gateway: Add `-store-gateway.sharding-ring.keep-instance-in-the-ring-on-shutdown` to skip unregistering instance from the ring in shutdown. #5421
* [FEATURE] Ruler: Support for filtering rules in the API. #5417
* [FEATURE] Compactor: Add `-compactor.ring.tokens-file-path` to store generated tokens locally. #5432
* [FEATURE] Query Frontend: Add `-frontend.retry-on-too-many-outstanding-requests` to re-enqueue 429 requests if there are multiple query-schedulers available. #5496
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3528,6 +3528,11 @@ grpc_client_config:
# CLI flag: -frontend.grpc-client-config.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]

# When multiple query-schedulers are available, re-enqueue queries that were
# rejected due to too many outstanding requests.
# CLI flag: -frontend.retry-on-too-many-outstanding-requests
[retry_on_too_many_outstanding_requests: <boolean> | default = false]

# Name of network interface to read address from. This address is sent to
# query-scheduler and querier, which uses it to send the query response back to
# query-frontend.
Expand Down
14 changes: 10 additions & 4 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (

// Config for a Frontend.
type Config struct {
SchedulerAddress string `yaml:"scheduler_address"`
DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"`
WorkerConcurrency int `yaml:"scheduler_worker_concurrency"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
SchedulerAddress string `yaml:"scheduler_address"`
DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"`
WorkerConcurrency int `yaml:"scheduler_worker_concurrency"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
RetryOnTooManyOutstandingRequests bool `yaml:"retry_on_too_many_outstanding_requests"`

// Used to find local IP address, that is sent to scheduler and querier-worker.
InfNames []string `yaml:"instance_interface_names"`
Expand All @@ -47,6 +48,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.SchedulerAddress, "frontend.scheduler-address", "", "DNS hostname used for finding query-schedulers.")
f.DurationVar(&cfg.DNSLookupPeriod, "frontend.scheduler-dns-lookup-period", 10*time.Second, "How often to resolve the scheduler-address, in order to look for new query-scheduler instances.")
f.IntVar(&cfg.WorkerConcurrency, "frontend.scheduler-worker-concurrency", 5, "Number of concurrent workers forwarding queries to single query-scheduler.")
f.BoolVar(&cfg.RetryOnTooManyOutstandingRequests, "frontend.retry-on-too-many-outstanding-requests", false, "When multiple query-schedulers are available, re-enqueue queries that were rejected due to too many outstanding requests.")

cfg.InfNames = []string{"eth0", "en0"}
f.Var((*flagext.StringSlice)(&cfg.InfNames), "frontend.instance-interface-names", "Name of network interface to read address from. This address is sent to query-scheduler and querier, which uses it to send the query response back to query-frontend.")
Expand Down Expand Up @@ -86,6 +88,8 @@ type frontendRequest struct {

enqueue chan enqueueResult
response chan *frontendv2pb.QueryResultRequest

retryOnTooManyOutstandingRequests bool
}

type enqueueStatus int
Expand Down Expand Up @@ -192,6 +196,8 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
// even if this goroutine goes away due to client context cancellation.
enqueue: make(chan enqueueResult, 1),
response: make(chan *frontendv2pb.QueryResultRequest, 1),

retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1,
}

f.requests.put(freq)
Expand Down
16 changes: 10 additions & 6 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,16 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
}

case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT:
req.enqueue <- enqueueResult{status: waitForResponse}
req.response <- &frontendv2pb.QueryResultRequest{
HttpResponse: &httpgrpc.HTTPResponse{
Code: http.StatusTooManyRequests,
Body: []byte("too many outstanding requests"),
},
if req.retryOnTooManyOutstandingRequests {
req.enqueue <- enqueueResult{status: failed}
} else {
req.enqueue <- enqueueResult{status: waitForResponse}
req.response <- &frontendv2pb.QueryResultRequest{
HttpResponse: &httpgrpc.HTTPResponse{
Code: http.StatusTooManyRequests,
Body: []byte("too many outstanding requests"),
},
}
}
}

Expand Down