Skip to content

Add ResourceMonitor module in Cortex, and add ResourceBasedLimiter in Ingesters and StoreGateways #6674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
80b2d5c
Add resource based throttling to ingesters and store gateways
justinjung04 Mar 25, 2025
2121845
doc
justinjung04 Mar 25, 2025
2b168fc
Add automaxprocs
justinjung04 Mar 25, 2025
56f8e57
nit
justinjung04 Mar 25, 2025
9efbbd9
Add test for monitor
justinjung04 Mar 26, 2025
30bbd3d
fix tests
justinjung04 Mar 26, 2025
fa56e65
changelog
justinjung04 Mar 26, 2025
a2ffcdd
Merge branch 'master' into resource-based-throttling
justinjung04 Mar 26, 2025
5cccd60
fix test
justinjung04 Mar 26, 2025
6e37330
remove interface
justinjung04 Mar 26, 2025
08a6adf
address comments
justinjung04 Mar 31, 2025
067478b
rename doc
justinjung04 Mar 31, 2025
18fdf37
Make monitor more generic + separate scanners
justinjung04 Apr 10, 2025
aa81155
fix tests
justinjung04 Apr 10, 2025
a528a7a
fix more tests
justinjung04 Apr 10, 2025
42e52b3
remove monitor_test.go
justinjung04 Apr 10, 2025
50993e1
move noop scanner to darwin scanner
justinjung04 Apr 10, 2025
e56431e
doc update
justinjung04 Apr 10, 2025
eae4df7
doc
justinjung04 Apr 10, 2025
fd19f5c
lint
justinjung04 Apr 10, 2025
f588d94
add debugging log on unsupported resource type
justinjung04 Apr 10, 2025
6138a9d
test
justinjung04 Apr 10, 2025
7bd7ab9
add more error handling + resource_based_limiter_limit metric
justinjung04 Apr 10, 2025
6da53e9
fix test
justinjung04 Apr 10, 2025
a8d4218
fix test
justinjung04 Apr 10, 2025
d6d3839
update changelog
justinjung04 Apr 10, 2025
c68bbd2
Move noopScanner to scanner.go and fix RegisterFlagsWithPrefix
justinjung04 Apr 15, 2025
025a93a
Add limit breached metric + wrap error with 429
justinjung04 Apr 16, 2025
6ffef63
Add more validation and test on instance_limits
justinjung04 Apr 16, 2025
7808940
Added _total to counter metric
justinjung04 Apr 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [FEATURE] Query Frontend: Add dynamic interval size for query splitting. This is enabled by configuring experimental flags `querier.max-shards-per-query` and/or `querier.max-fetched-data-duration-per-query`. The split interval size is dynamically increased to maintain a number of shards and total duration fetched below the configured values. #6458
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
* [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590
* [FEATURE] Ingester/StoreGateway: Add `ResourceMonitor` module in Cortex, and add `ResourceBasedLimiter` in Ingesters and StoreGateways. #6674
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
* [ENHANCEMENT] Alertmanager: Add nflog and silences maintenance metrics. #6659
* [ENHANCEMENT] Querier: limit label APIs to query only ingesters if `start` param is not been specified. #6618
Expand Down
1 change: 1 addition & 0 deletions cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
collectorversion "github.com/prometheus/client_golang/prometheus/collectors/version"
"github.com/prometheus/common/version"
_ "go.uber.org/automaxprocs"
"gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/cortex"
Expand Down
15 changes: 15 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,21 @@ store_gateway:
# CLI flag: -store-gateway.disabled-tenants
[disabled_tenants: <string> | default = ""]

instance_limits:
# EXPERIMENTAL: Max CPU utilization that this ingester can reach before
# rejecting new query request (across all tenants) in percentage, between 0
# and 1. monitored_resources config must include the resource type. 0 to
# disable.
# CLI flag: -store-gateway.instance-limits.cpu-utilization
[cpu_utilization: <float> | default = 0]

# EXPERIMENTAL: Max heap utilization that this ingester can reach before
# rejecting new query request (across all tenants) in percentage, between 0
# and 1. monitored_resources config must include the resource type. 0 to
# disable.
# CLI flag: -store-gateway.instance-limits.heap-utilization
[heap_utilization: <float> | default = 0]

hedged_request:
# If true, hedged requests are applied to object store calls. It can help
# with reducing tail latency.
Expand Down
35 changes: 35 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ Where default_value is the value to use if the environment variable is undefined
# CLI flag: -http.prefix
[http_prefix: <string> | default = "/api/prom"]

# Comma-separated list of resources to monitor. Supported values are cpu and
# heap, which tracks metrics from github.com/prometheus/procfs and
# runtime/metrics that are close estimates. Empty string to disable.
# CLI flag: -monitored.resources
[monitored_resources: <string> | default = ""]

api:
# Use GZIP compression for API responses. Some endpoints serve large YAML or
# JSON blobs which can benefit from compression.
Expand Down Expand Up @@ -3197,6 +3203,20 @@ lifecycler:
[upload_compacted_blocks_enabled: <boolean> | default = true]

instance_limits:
# EXPERIMENTAL: Max CPU utilization that this ingester can reach before
# rejecting new query request (across all tenants) in percentage, between 0
# and 1. monitored_resources config must include the resource type. 0 to
# disable.
# CLI flag: -ingester.instance-limits.cpu-utilization
[cpu_utilization: <float> | default = 0]

# EXPERIMENTAL: Max heap utilization that this ingester can reach before
# rejecting new query request (across all tenants) in percentage, between 0
# and 1. monitored_resources config must include the resource type. 0 to
# disable.
# CLI flag: -ingester.instance-limits.heap-utilization
[heap_utilization: <float> | default = 0]

# Max ingestion rate (samples/sec) that ingester will accept. This limit is
# per-ingester, not per-tenant. Additional push requests will be rejected.
# Current ingestion rate is computed as exponentially weighted moving average,
Expand Down Expand Up @@ -5850,6 +5870,21 @@ sharding_ring:
# CLI flag: -store-gateway.disabled-tenants
[disabled_tenants: <string> | default = ""]

instance_limits:
# EXPERIMENTAL: Max CPU utilization that this ingester can reach before
# rejecting new query request (across all tenants) in percentage, between 0
# and 1. monitored_resources config must include the resource type. 0 to
# disable.
# CLI flag: -store-gateway.instance-limits.cpu-utilization
[cpu_utilization: <float> | default = 0]

# EXPERIMENTAL: Max heap utilization that this ingester can reach before
# rejecting new query request (across all tenants) in percentage, between 0
# and 1. monitored_resources config must include the resource type. 0 to
# disable.
# CLI flag: -store-gateway.instance-limits.heap-utilization
[heap_utilization: <float> | default = 0]

hedged_request:
# If true, hedged requests are applied to object store calls. It can help with
# reducing tail latency.
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,8 @@ Currently experimental features are:
- Query-frontend: dynamic query splits
- `querier.max-shards-per-query` (int) CLI flag
- `querier.max-fetched-data-duration-per-query` (duration) CLI flag
- Ingester/Store-Gateway: Resource-based throttling
- `-ingester.instance-limits.cpu-utilization`
- `-ingester.instance-limits.heap-utilization`
- `-store-gateway.instance-limits.cpu-utilization`
- `-store-gateway.instance-limits.heap-utilization`
56 changes: 56 additions & 0 deletions docs/guides/protecting-cortex-from-heavy-queries.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
title: "Protecting Cortex from Heavy Queries"
linkTitle: "Protecting Cortex from Heavy Queries"
weight: 11
slug: protecting-cortex-from-heavy-queries
---

PromQL is powerful, and is able to result in query requests that have very wide range of data fetched and samples processed. Heavy queries can cause:

1. CPU on any query component to be partially exhausted, increasing latency and causing incoming queries to queue up with high chance of time-out.
2. CPU on any query component to be fully exhausted, causing GC to slow down leading to the pod being out-of-memory and killed.
3. Heap memory on any query component to be exhausted, leading to the pod being out-of-memory and killed.

It's important to protect Cortex components by setting appropriate limits and throttling configurations based on your infrastructure and data ingested by the customers.

## Static limits

There are number of static limits that you could configure to block heavy queries from running.

### Max outstanding requests per tenant

See https://cortexmetrics.io/docs/configuration/configuration-file/#query_frontend_config:~:text=max_outstanding_requests_per_tenant for details.

### Max data bytes fetched per (sharded) query

See https://cortexmetrics.io/docs/configuration/configuration-file/#query_frontend_config:~:text=max_fetched_data_bytes_per_query for details.

### Max series fetched per (sharded) query

See https://cortexmetrics.io/docs/configuration/configuration-file/#query_frontend_config:~:text=max_fetched_series_per_query for details.

### Max chunks fetched per (sharded) query

See https://cortexmetrics.io/docs/configuration/configuration-file/#query_frontend_config:~:text=max_fetched_chunk_bytes_per_query for details.

### Max samples fetched per (sharded) query

See https://cortexmetrics.io/docs/configuration/configuration-file/#querier_config:~:text=max_samples for details.

## Resource-based throttling (Experimental)

Although the static limits are able to protect Cortex components from specific query patterns, they are not generic enough to cover different combinations of bad query patterns. For example, what if the query fetches relatively large postings, series and chunks that are slightly below the individual limits? For a more generic solution, you can enable resource-based throttling by setting CPU and heap utilization thresholds.

Currently, it only throttles incoming query requests with error code 429 (too many requests) when the resource usage breaches the configured thresholds.

For example, the following configuration will start throttling query requests if either CPU or heap utilization is above 80%, leaving 20% of room for inflight requests.

```
target: ingester
monitored_resources: cpu,heap
instance_limits:
cpu_utilization: 0.8
heap_utilization: 0.8
```

See https://cortexmetrics.io/docs/configuration/configuration-file/:~:text=instance_limits for details.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ require (
github.com/google/go-cmp v0.7.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/prometheus/procfs v0.15.1
github.com/sercand/kuberesolver/v5 v5.1.1
github.com/tjhop/slog-gokit v0.1.2
go.opentelemetry.io/collector/pdata v1.24.0
go.uber.org/automaxprocs v1.6.0
google.golang.org/protobuf v1.36.4
)

Expand Down Expand Up @@ -199,7 +201,6 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 // indirect
github.com/prometheus/exporter-toolkit v0.13.2 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/prometheus/sigv4 v0.1.1 // indirect
github.com/redis/rueidis v1.0.45-alpha.1 // indirect
github.com/rs/cors v1.11.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1560,6 +1560,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 h1:owfYHh79h8Y5HvNMGyww+DaVwo10CKiRW1RQrrZzIwg=
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0/go.mod h1:rT989D4UtOcfd9tVqIZRVIM8rkg+9XbreBjFNEKXvVI=
github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA=
Expand Down Expand Up @@ -1811,6 +1813,8 @@ go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
Expand Down
40 changes: 40 additions & 0 deletions pkg/configs/instance_limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package configs

import (
"errors"
"flag"
"strings"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/resource"
)

type InstanceLimits struct {
CPUUtilization float64 `yaml:"cpu_utilization"`
HeapUtilization float64 `yaml:"heap_utilization"`
}

func (cfg *InstanceLimits) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.Float64Var(&cfg.CPUUtilization, prefix+"instance-limits.cpu-utilization", 0, "EXPERIMENTAL: Max CPU utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.")
f.Float64Var(&cfg.HeapUtilization, prefix+"instance-limits.heap-utilization", 0, "EXPERIMENTAL: Max heap utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.")
}

func (cfg *InstanceLimits) Validate(monitoredResources flagext.StringSliceCSV) error {
if cfg.CPUUtilization > 1 || cfg.CPUUtilization < 0 {
return errors.New("cpu_utilization must be between 0 and 1")
}

if cfg.CPUUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.CPU)) {
return errors.New("monitored_resources config must include \"cpu\" as well")
}

if cfg.HeapUtilization > 1 || cfg.HeapUtilization < 0 {
return errors.New("heap_utilization must be between 0 and 1")
}

if cfg.HeapUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.Heap)) {
return errors.New("monitored_resources config must include \"heap\" as well")
}

return nil
}
64 changes: 64 additions & 0 deletions pkg/configs/instance_limits_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package configs

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
)

func Test_Validate(t *testing.T) {
for name, tc := range map[string]struct {
instanceLimits InstanceLimits
monitoredResources []string
err error
}{
"correct config should pass validation": {
instanceLimits: InstanceLimits{
CPUUtilization: 0.5,
HeapUtilization: 0.5,
},
monitoredResources: []string{"cpu", "heap"},
err: nil,
},
"utilization config less than 0 should fail validation": {
instanceLimits: InstanceLimits{
CPUUtilization: -0.5,
HeapUtilization: 0.5,
},
monitoredResources: []string{"cpu", "heap"},
err: errors.New("cpu_utilization must be between 0 and 1"),
},
"utilization config greater than 1 should fail validation": {
instanceLimits: InstanceLimits{
CPUUtilization: 0.5,
HeapUtilization: 1.5,
},
monitoredResources: []string{"cpu", "heap"},
err: errors.New("heap_utilization must be between 0 and 1"),
},
"missing cpu in monitored_resources config should fail validation": {
instanceLimits: InstanceLimits{
CPUUtilization: 0.5,
},
monitoredResources: []string{"heap"},
err: errors.New("monitored_resources config must include \"cpu\" as well"),
},
"missing heap in monitored_resources config should fail validation": {
instanceLimits: InstanceLimits{
HeapUtilization: 0.5,
},
monitoredResources: []string{"cpu"},
err: errors.New("monitored_resources config must include \"heap\" as well"),
},
} {
t.Run(name, func(t *testing.T) {
err := tc.instanceLimits.Validate(tc.monitoredResources)
if tc.err != nil {
require.Errorf(t, err, tc.err.Error())
} else {
require.NoError(t, err)
}
})
}
}
30 changes: 24 additions & 6 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/cortexproject/cortex/pkg/util/resource"

"github.com/cortexproject/cortex/pkg/alertmanager"
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
Expand Down Expand Up @@ -88,10 +89,11 @@ var (

// Config is the root config for Cortex.
type Config struct {
Target flagext.StringSliceCSV `yaml:"target"`
AuthEnabled bool `yaml:"auth_enabled"`
PrintConfig bool `yaml:"-"`
HTTPPrefix string `yaml:"http_prefix"`
Target flagext.StringSliceCSV `yaml:"target"`
AuthEnabled bool `yaml:"auth_enabled"`
PrintConfig bool `yaml:"-"`
HTTPPrefix string `yaml:"http_prefix"`
MonitoredResources flagext.StringSliceCSV `yaml:"monitored_resources"`

ExternalQueryable prom_storage.Queryable `yaml:"-"`
ExternalPusher ruler.Pusher `yaml:"-"`
Expand Down Expand Up @@ -143,6 +145,11 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.")
f.StringVar(&c.HTTPPrefix, "http.prefix", "/api/prom", "HTTP path prefix for Cortex API.")

c.MonitoredResources = []string{}
f.Var(&c.MonitoredResources, "monitored.resources", "Comma-separated list of resources to monitor. "+
"Supported values are cpu and heap, which tracks metrics from github.com/prometheus/procfs and runtime/metrics "+
"that are close estimates. Empty string to disable.")

c.API.RegisterFlags(f)
c.registerServerFlagsWithChangedDefaultValues(f)
c.Distributor.RegisterFlags(f)
Expand Down Expand Up @@ -216,7 +223,7 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.QueryRange.Validate(c.Querier); err != nil {
return errors.Wrap(err, "invalid query_range config")
}
if err := c.StoreGateway.Validate(c.LimitsConfig); err != nil {
if err := c.StoreGateway.Validate(c.LimitsConfig, c.MonitoredResources); err != nil {
return errors.Wrap(err, "invalid store-gateway config")
}
if err := c.Compactor.Validate(c.LimitsConfig); err != nil {
Expand All @@ -229,14 +236,24 @@ func (c *Config) Validate(log log.Logger) error {
return errors.Wrap(err, "invalid alertmanager config")
}

if err := c.Ingester.Validate(); err != nil {
if err := c.Ingester.Validate(c.MonitoredResources); err != nil {
return errors.Wrap(err, "invalid ingester config")
}

if err := c.Tracing.Validate(); err != nil {
return errors.Wrap(err, "invalid tracing config")
}

for _, r := range c.MonitoredResources {
switch resource.Type(r) {
case resource.CPU, resource.Heap:
default:
if len(r) > 0 {
return fmt.Errorf("unsupported resource type to monitor: %s", r)
}
}
}

return nil
}

Expand Down Expand Up @@ -315,6 +332,7 @@ type Cortex struct {
MetadataQuerier querier.MetadataQuerier
QuerierEngine promql.QueryEngine
QueryFrontendTripperware tripperware.Tripperware
ResourceMonitor *resource.Monitor

Ruler *ruler.Ruler
RulerStorage rulestore.RuleStore
Expand Down
Loading
Loading