Skip to content

Commit ba306c3

Browse files
authored
Monitor and protect distributor from OOMKilled due to too many in progress requests (#5917)
* Added metric to keep track of how many in progress request from one distributor to one ingester. Created inflight request limit per ingester client. Signed-off-by: Alex Le <leqiyue@amazon.com> * moved throttling logic to closableHealthAndIngesterClient Signed-off-by: Alex Le <leqiyue@amazon.com> * update changelog and config doc and reverted metric cortex_distributor_inflight_push_requests Signed-off-by: Alex Le <leqiyue@amazon.com> * add unit test Signed-off-by: Alex Le <leqiyue@amazon.com> * rename Signed-off-by: Alex Le <leqiyue@amazon.com> --------- Signed-off-by: Alex Le <leqiyue@amazon.com>
1 parent 935c394 commit ba306c3

File tree

4 files changed

+142
-10
lines changed

4 files changed

+142
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
55
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
66
* [ENHANCEMENT] Ingester: Allowing to configure `-blocks-storage.tsdb.head-compaction-interval` flag up to 30 min and add a jitter on the first head compaction. #5919
7+
* [ENHANCEMENT] Distributor: Added `max_inflight_push_requests` config to ingester client to protect distributor from OOMKilled. #5917
78
* [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906
89
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
910

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3026,6 +3026,11 @@ grpc_client_config:
30263026
# Skip validating server certificate.
30273027
# CLI flag: -ingester.client.tls-insecure-skip-verify
30283028
[tls_insecure_skip_verify: <boolean> | default = false]
3029+
3030+
# Max inflight push requests that this ingester client can handle. This limit is
3031+
# per-ingester-client. Additional requests will be rejected. 0 = unlimited.
3032+
# CLI flag: -ingester.client.max-inflight-push-requests
3033+
[max_inflight_push_requests: <int> | default = 0]
30293034
```
30303035

30313036
### `limits_config`

pkg/ingester/client/client.go

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"github.com/cortexproject/cortex/pkg/util/grpcclient"
99

1010
"github.com/go-kit/log"
11+
"github.com/pkg/errors"
1112
"github.com/prometheus/client_golang/prometheus"
1213
"github.com/prometheus/client_golang/prometheus/promauto"
14+
"go.uber.org/atomic"
1315
"google.golang.org/grpc"
1416
"google.golang.org/grpc/health/grpc_health_v1"
1517
)
@@ -20,6 +22,19 @@ var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.Histogra
2022
Help: "Time spent doing Ingester requests.",
2123
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
2224
}, []string{"operation", "status_code"})
25+
var ingesterClientInflightPushRequests = promauto.NewGaugeVec(prometheus.GaugeOpts{
26+
Namespace: "cortex",
27+
Name: "ingester_client_inflight_push_requests",
28+
Help: "Number of Ingester client push requests.",
29+
}, []string{"ingester"})
30+
31+
var errTooManyInflightPushRequests = errors.New("too many inflight push requests in ingester client")
32+
33+
// ClosableClientConn is grpc.ClientConnInterface with Close function
34+
type ClosableClientConn interface {
35+
grpc.ClientConnInterface
36+
Close() error
37+
}
2338

2439
// HealthAndIngesterClient is the union of IngesterClient and grpc_health_v1.HealthClient.
2540
type HealthAndIngesterClient interface {
@@ -32,16 +47,40 @@ type HealthAndIngesterClient interface {
3247
type closableHealthAndIngesterClient struct {
3348
IngesterClient
3449
grpc_health_v1.HealthClient
35-
conn *grpc.ClientConn
50+
conn ClosableClientConn
51+
maxInflightPushRequests int64
52+
inflightRequests atomic.Int64
53+
inflightPushRequests prometheus.Gauge
3654
}
3755

3856
func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
39-
out := new(cortexpb.WriteResponse)
40-
err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...)
41-
if err != nil {
42-
return nil, err
57+
return c.handlePushRequest(func() (*cortexpb.WriteResponse, error) {
58+
out := new(cortexpb.WriteResponse)
59+
err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...)
60+
if err != nil {
61+
return nil, err
62+
}
63+
return out, nil
64+
})
65+
}
66+
67+
func (c *closableHealthAndIngesterClient) Push(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
68+
return c.handlePushRequest(func() (*cortexpb.WriteResponse, error) {
69+
return c.IngesterClient.Push(ctx, in, opts...)
70+
})
71+
}
72+
73+
func (c *closableHealthAndIngesterClient) handlePushRequest(mainFunc func() (*cortexpb.WriteResponse, error)) (*cortexpb.WriteResponse, error) {
74+
currentInflight := c.inflightRequests.Inc()
75+
c.inflightPushRequests.Inc()
76+
defer func() {
77+
c.inflightPushRequests.Dec()
78+
c.inflightRequests.Dec()
79+
}()
80+
if c.maxInflightPushRequests > 0 && currentInflight > c.maxInflightPushRequests {
81+
return nil, errTooManyInflightPushRequests
4382
}
44-
return out, nil
83+
return mainFunc()
4584
}
4685

4786
// MakeIngesterClient makes a new IngesterClient
@@ -55,9 +94,11 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error
5594
return nil, err
5695
}
5796
return &closableHealthAndIngesterClient{
58-
IngesterClient: NewIngesterClient(conn),
59-
HealthClient: grpc_health_v1.NewHealthClient(conn),
60-
conn: conn,
97+
IngesterClient: NewIngesterClient(conn),
98+
HealthClient: grpc_health_v1.NewHealthClient(conn),
99+
conn: conn,
100+
maxInflightPushRequests: cfg.MaxInflightPushRequests,
101+
inflightPushRequests: ingesterClientInflightPushRequests.WithLabelValues(addr),
61102
}, nil
62103
}
63104

@@ -67,12 +108,14 @@ func (c *closableHealthAndIngesterClient) Close() error {
67108

68109
// Config is the configuration struct for the ingester client
69110
type Config struct {
70-
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
111+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
112+
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
71113
}
72114

73115
// RegisterFlags registers configuration settings used by the ingester client config.
74116
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
75117
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f)
118+
f.Int64Var(&cfg.MaxInflightPushRequests, "ingester.client.max-inflight-push-requests", 0, "Max inflight push requests that this ingester client can handle. This limit is per-ingester-client. Additional requests will be rejected. 0 = unlimited.")
76119
}
77120

78121
func (cfg *Config) Validate(log log.Logger) error {

pkg/ingester/client/client_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ import (
66
"strconv"
77
"testing"
88

9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/stretchr/testify/assert"
911
"github.com/stretchr/testify/require"
12+
"google.golang.org/grpc"
1013

1114
"github.com/cortexproject/cortex/pkg/cortexpb"
1215
"github.com/cortexproject/cortex/pkg/util"
@@ -47,3 +50,83 @@ func TestMarshall(t *testing.T) {
4750
require.Equal(t, numSeries, len(req.Timeseries))
4851
}
4952
}
53+
54+
func TestClosableHealthAndIngesterClient_MaxInflightPushRequests(t *testing.T) {
55+
t.Parallel()
56+
57+
tests := map[string]struct {
58+
inflightPushRequests int64
59+
maxInflightPushRequests int64
60+
expectThrottle bool
61+
}{
62+
"no limit": {
63+
inflightPushRequests: 1000,
64+
maxInflightPushRequests: 0,
65+
expectThrottle: false,
66+
},
67+
"inflight request is under limit": {
68+
inflightPushRequests: 99,
69+
maxInflightPushRequests: 100,
70+
expectThrottle: false,
71+
},
72+
"inflight request hits limit": {
73+
inflightPushRequests: 100,
74+
maxInflightPushRequests: 100,
75+
expectThrottle: true,
76+
},
77+
}
78+
ctx := context.Background()
79+
for testName, testData := range tests {
80+
tData := testData
81+
t.Run(testName, func(t *testing.T) {
82+
t.Parallel()
83+
84+
client1 := createTestIngesterClient(tData.maxInflightPushRequests, tData.inflightPushRequests)
85+
_, err := client1.Push(ctx, nil)
86+
if tData.expectThrottle {
87+
assert.ErrorIs(t, err, errTooManyInflightPushRequests)
88+
} else {
89+
assert.NoError(t, err)
90+
}
91+
92+
client2 := createTestIngesterClient(tData.maxInflightPushRequests, tData.inflightPushRequests)
93+
_, err = client2.PushPreAlloc(ctx, nil)
94+
if tData.expectThrottle {
95+
assert.ErrorIs(t, err, errTooManyInflightPushRequests)
96+
} else {
97+
assert.NoError(t, err)
98+
}
99+
})
100+
}
101+
}
102+
103+
func createTestIngesterClient(maxInflightPushRequests int64, currentInflightRequests int64) *closableHealthAndIngesterClient {
104+
client := &closableHealthAndIngesterClient{
105+
IngesterClient: &mockIngester{},
106+
conn: &mockClientConn{},
107+
maxInflightPushRequests: maxInflightPushRequests,
108+
inflightPushRequests: prometheus.NewGauge(prometheus.GaugeOpts{}),
109+
}
110+
client.inflightRequests.Add(currentInflightRequests)
111+
return client
112+
}
113+
114+
type mockIngester struct {
115+
IngesterClient
116+
}
117+
118+
func (m *mockIngester) Push(_ context.Context, _ *cortexpb.WriteRequest, _ ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
119+
return &cortexpb.WriteResponse{}, nil
120+
}
121+
122+
type mockClientConn struct {
123+
ClosableClientConn
124+
}
125+
126+
func (m *mockClientConn) Invoke(_ context.Context, _ string, _ any, _ any, _ ...grpc.CallOption) error {
127+
return nil
128+
}
129+
130+
func (m *mockClientConn) Close() error {
131+
return nil
132+
}

0 commit comments

Comments
 (0)