Skip to content

Add distributor inflight client request limit #6376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Ingester: Introduce a new experimental feature for caching expanded postings on the ingester. #6296
* [ENHANCEMENT] Querier/Ruler: Expose `store_gateway_consistency_check_max_attempts` for max retries when querying store gateway in consistency check. #6276
* [ENHANCEMENT] StoreGateway: Add new `cortex_bucket_store_chunk_pool_inuse_bytes` metric to track the usage in chunk pool. #6310
* [ENHANCEMENT] Distributor: Add new `cortex_distributor_inflight_client_requests` metric to track number of ingester client inflight requests. #6358
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled #6271
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2679,6 +2679,12 @@ instance_limits:
# CLI flag: -distributor.instance-limits.max-inflight-push-requests
[max_inflight_push_requests: <int> | default = 0]

# Max inflight ingester client requests that this distributor can handle. This
# limit is per-distributor, not per-tenant. Additional requests will be
# rejected. 0 = unlimited.
# CLI flag: -distributor.instance-limits.max-inflight-client-requests
[max_inflight_client_requests: <int> | default = 0]

otlp:
# If true, all resource attributes are converted to labels.
# CLI flag: -distributor.otlp.convert-all-attributes
Expand Down
33 changes: 29 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
// Distributor instance limits errors.
errTooManyInflightPushRequests = errors.New("too many inflight push requests in distributor")
errMaxSamplesPushRateLimitReached = errors.New("distributor's samples push rate limit reached")
errTooManyInflightClientRequests = errors.New("too many inflight ingester client requests in distributor")
)

const (
Expand Down Expand Up @@ -104,8 +105,9 @@ type Distributor struct {

activeUsers *util.ActiveUsersCleanupService

ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64
ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64
inflightClientRequests atomic.Int64

// Metrics
queryDuration *instrument.HistogramCollector
Expand Down Expand Up @@ -171,8 +173,9 @@ type Config struct {
}

type InstanceLimits struct {
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
MaxInflightClientRequests int `yaml:"max_inflight_client_requests"`
}

type OTLPConfig struct {
Expand All @@ -198,6 +201,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")
f.IntVar(&cfg.InstanceLimits.MaxInflightClientRequests, "distributor.instance-limits.max-inflight-client-requests", 0, "Max inflight ingester client requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")

f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.")
f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)")
Expand Down Expand Up @@ -374,6 +378,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Help: instanceLimitsMetricHelp,
ConstLabels: map[string]string{limitLabel: "max_inflight_push_requests"},
}).Set(float64(cfg.InstanceLimits.MaxInflightPushRequests))
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: instanceLimitsMetric,
Help: instanceLimitsMetricHelp,
ConstLabels: map[string]string{limitLabel: "max_inflight_client_requests"},
}).Set(float64(cfg.InstanceLimits.MaxInflightClientRequests))
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: instanceLimitsMetric,
Help: instanceLimitsMetricHelp,
Expand All @@ -386,6 +395,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
}, func() float64 {
return float64(d.inflightPushRequests.Load())
})

promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_distributor_inflight_client_requests",
Help: "Current number of inflight client requests in distributor.",
}, func() float64 {
return float64(d.inflightClientRequests.Load())
})
promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_distributor_ingestion_rate_samples_per_second",
Help: "Current ingestion rate in samples/sec that distributor is using to limit access.",
Expand Down Expand Up @@ -661,6 +677,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}
}

// only reject requests at this stage to allow distributor to finish sending the current batch request to all ingesters
// even if we've exceeded the MaxInflightClientRequests in the `doBatch`
if d.cfg.InstanceLimits.MaxInflightClientRequests > 0 && d.inflightClientRequests.Load() > int64(d.cfg.InstanceLimits.MaxInflightClientRequests) {
return nil, errTooManyInflightClientRequests
}

removeReplica := false
// Cache user limit with overrides so we spend less CPU doing locking. See issue #4904
limits := d.limits.GetOverridesForUser(userID)
Expand Down Expand Up @@ -1023,6 +1045,9 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
req.Metadata = metadata
req.Source = source

d.inflightClientRequests.Inc()
defer d.inflightClientRequests.Dec()

_, err = c.PushPreAlloc(ctx, req)

// We should not reuse the req in case of errors:
Expand Down
56 changes: 44 additions & 12 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,13 +786,15 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {

ctx := user.InjectOrgID(context.Background(), "user")
tests := map[string]struct {
preInflight int
preRateSamples int // initial rate before first push
pushes []testPush // rate is recomputed after each push
preInflight int
preInflightClient int
preRateSamples int // initial rate before first push
pushes []testPush // rate is recomputed after each push

// limits
inflightLimit int
ingestionRateLimit float64
inflightLimit int
inflightClientLimit int
ingestionRateLimit float64

metricNames []string
expectedMetrics string
Expand All @@ -809,6 +811,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
expectedMetrics: `
# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
# TYPE cortex_distributor_instance_limits gauge
cortex_distributor_instance_limits{limit="max_inflight_client_requests"} 0
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 0
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 0
`,
Expand All @@ -828,6 +831,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {

# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
# TYPE cortex_distributor_instance_limits gauge
cortex_distributor_instance_limits{limit="max_inflight_client_requests"} 0
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 101
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 0
`,
Expand All @@ -839,6 +843,29 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
{samples: 100, expectedError: errTooManyInflightPushRequests},
},
},
"below inflight client limit": {
preInflightClient: 90,
inflightClientLimit: 101,
pushes: []testPush{
{samples: 100, expectedError: nil},
},

metricNames: []string{instanceLimitsMetric},
expectedMetrics: `
# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
# TYPE cortex_distributor_instance_limits gauge
cortex_distributor_instance_limits{limit="max_inflight_client_requests"} 101
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 0
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 0
`,
},
"hits inflight client limit": {
preInflightClient: 103,
inflightClientLimit: 101,
pushes: []testPush{
{samples: 100, expectedError: errTooManyInflightClientRequests},
},
},
"below ingestion rate limit": {
preRateSamples: 500,
ingestionRateLimit: 1000,
Expand All @@ -855,6 +882,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {

# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
# TYPE cortex_distributor_instance_limits gauge
cortex_distributor_instance_limits{limit="max_inflight_client_requests"} 0
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 0
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 1000
`,
Expand Down Expand Up @@ -894,17 +922,19 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {

// Start all expected distributors
distributors, _, regs, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
shardByAllLabels: true,
limits: limits,
maxInflightRequests: testData.inflightLimit,
maxIngestionRate: testData.ingestionRateLimit,
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
shardByAllLabels: true,
limits: limits,
maxInflightRequests: testData.inflightLimit,
maxInflightClientRequests: testData.inflightClientLimit,
maxIngestionRate: testData.ingestionRateLimit,
})

d := distributors[0]
d.inflightPushRequests.Add(int64(testData.preInflight))
d.inflightClientRequests.Add(int64(testData.preInflightClient))
d.ingestionRate.Add(int64(testData.preRateSamples))

d.ingestionRate.Tick()
Expand Down Expand Up @@ -2790,6 +2820,7 @@ type prepConfig struct {
numDistributors int
skipLabelNameValidation bool
maxInflightRequests int
maxInflightClientRequests int
maxIngestionRate float64
replicationFactor int
enableTracker bool
Expand Down Expand Up @@ -2907,6 +2938,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
distributorCfg.DistributorRing.InstanceAddr = "127.0.0.1"
distributorCfg.SkipLabelNameValidation = cfg.skipLabelNameValidation
distributorCfg.InstanceLimits.MaxInflightPushRequests = cfg.maxInflightRequests
distributorCfg.InstanceLimits.MaxInflightClientRequests = cfg.maxInflightClientRequests
distributorCfg.InstanceLimits.MaxIngestionRate = cfg.maxIngestionRate

if cfg.shuffleShardEnabled {
Expand Down
Loading