Skip to content

Commit e88b17e

Browse files
committed
Add max-inflight-request limit to bucket stores
Signed-off-by: Justin Jung <jungjust@amazon.com>
1 parent 7b9db50 commit e88b17e

File tree

5 files changed

+122
-0
lines changed

5 files changed

+122
-0
lines changed

pkg/querier/blocks_store_queryable.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,6 +1116,8 @@ func isRetryableError(err error) bool {
11161116
switch status.Code(err) {
11171117
case codes.Unavailable:
11181118
return true
1119+
case codes.ResourceExhausted:
1120+
return errors.Is(err, storegateway.ErrTooManyInflightRequests)
11191121
default:
11201122
return false
11211123
}

pkg/querier/blocks_store_queryable_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package querier
33
import (
44
"context"
55
"fmt"
6+
"github.com/cortexproject/cortex/pkg/storegateway"
67
"io"
78
"sort"
89
"strings"
@@ -708,6 +709,56 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
708709
},
709710
},
710711
},
712+
"multiple store-gateways has the block, but one of them had too many inflight requests": {
713+
finderResult: bucketindex.Blocks{
714+
{ID: block1},
715+
},
716+
storeSetResponses: []interface{}{
717+
map[BlocksStoreClient][]ulid.ULID{
718+
&storeGatewayClientMock{
719+
remoteAddr: "1.1.1.1",
720+
mockedSeriesErr: storegateway.ErrTooManyInflightRequests,
721+
}: {block1},
722+
},
723+
map[BlocksStoreClient][]ulid.ULID{
724+
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
725+
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
726+
mockHintsResponse(block1),
727+
}}: {block1},
728+
},
729+
},
730+
limits: &blocksStoreLimitsMock{},
731+
queryLimiter: noOpQueryLimiter,
732+
expectedSeries: []seriesResult{
733+
{
734+
lbls: labels.New(metricNameLabel, series1Label),
735+
values: []valueResult{
736+
{t: minT, v: 2},
737+
},
738+
},
739+
},
740+
},
741+
"store gateway returns resource exhausted error other than max inflight request": {
742+
finderResult: bucketindex.Blocks{
743+
{ID: block1},
744+
},
745+
storeSetResponses: []interface{}{
746+
map[BlocksStoreClient][]ulid.ULID{
747+
&storeGatewayClientMock{
748+
remoteAddr: "1.1.1.1",
749+
mockedSeriesErr: status.Error(codes.ResourceExhausted, "some other resource"),
750+
}: {block1},
751+
},
752+
map[BlocksStoreClient][]ulid.ULID{
753+
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
754+
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
755+
mockHintsResponse(block1),
756+
}}: {block1},
757+
},
758+
},
759+
limits: &blocksStoreLimitsMock{},
760+
expectedErr: errors.Wrapf(status.Error(codes.ResourceExhausted, "some other resource"), "failed to fetch series from 1.1.1.1"),
761+
},
711762
}
712763

713764
for testName, testData := range tests {

pkg/storage/tsdb/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ type BucketStoreConfig struct {
241241
SyncDir string `yaml:"sync_dir"`
242242
SyncInterval time.Duration `yaml:"sync_interval"`
243243
MaxConcurrent int `yaml:"max_concurrent"`
244+
MaxInflightRequest int `yaml:"max_inflight_request"`
244245
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"`
245246
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
246247
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
@@ -291,6 +292,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
291292
f.IntVar(&cfg.ChunkPoolMinBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-min-bucket-size-bytes", ChunkPoolDefaultMinBucketSize, "Size - in bytes - of the smallest chunks pool bucket.")
292293
f.IntVar(&cfg.ChunkPoolMaxBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-max-bucket-size-bytes", ChunkPoolDefaultMaxBucketSize, "Size - in bytes - of the largest chunks pool bucket.")
293294
f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.")
295+
f.IntVar(&cfg.MaxInflightRequest, "blocks-storage.bucket-store.max-inflight-request", 0, "Max number of inflight queries to execute against the long-term storage. THe limit is shared across all tenants. 0 to disable.")
294296
f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants synching blocks.")
295297
f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks synching per tenant.")
296298
f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.")

pkg/storegateway/bucket_stores.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package storegateway
33
import (
44
"context"
55
"fmt"
6+
"google.golang.org/grpc/status"
67
"math"
78
"net/http"
89
"os"
@@ -72,13 +73,19 @@ type BucketStores struct {
7273
storesErrorsMu sync.RWMutex
7374
storesErrors map[string]error
7475

76+
// Keeps number of inflight requests
77+
inflightRequestCnt int
78+
inflightRequestMu sync.RWMutex
79+
7580
// Metrics.
7681
syncTimes prometheus.Histogram
7782
syncLastSuccess prometheus.Gauge
7883
tenantsDiscovered prometheus.Gauge
7984
tenantsSynced prometheus.Gauge
8085
}
8186

87+
var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway")
88+
8289
// NewBucketStores makes a new BucketStores.
8390
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
8491
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg)
@@ -293,6 +300,16 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
293300
spanLog, spanCtx := spanlogger.New(srv.Context(), "BucketStores.Series")
294301
defer spanLog.Span.Finish()
295302

303+
maxInflightRequest := u.cfg.BucketStore.MaxInflightRequest
304+
if maxInflightRequest > 0 {
305+
if u.inflightRequestCnt >= maxInflightRequest {
306+
return ErrTooManyInflightRequests
307+
}
308+
309+
u.incrementInflightRequestCnt()
310+
defer u.decrementInflightRequestCnt()
311+
}
312+
296313
userID := getUserIDFromGRPCContext(spanCtx)
297314
if userID == "" {
298315
return fmt.Errorf("no userID")
@@ -321,6 +338,18 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
321338
return err
322339
}
323340

341+
func (u *BucketStores) incrementInflightRequestCnt() {
342+
u.inflightRequestMu.Lock()
343+
u.inflightRequestCnt++
344+
u.inflightRequestMu.Unlock()
345+
}
346+
347+
func (u *BucketStores) decrementInflightRequestCnt() {
348+
u.inflightRequestMu.Lock()
349+
u.inflightRequestCnt--
350+
u.inflightRequestMu.Unlock()
351+
}
352+
324353
// LabelNames implements the Storegateway proto service.
325354
func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
326355
spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames")

pkg/storegateway/bucket_stores_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,44 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t
514514
}
515515
}
516516

517+
func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *testing.T) {
518+
cfg := prepareStorageConfig(t)
519+
cfg.BucketStore.MaxInflightRequest = 10
520+
reg := prometheus.NewPedanticRegistry()
521+
storageDir := t.TempDir()
522+
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
523+
require.NoError(t, err)
524+
525+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
526+
require.NoError(t, err)
527+
528+
stores.inflightRequestMu.Lock()
529+
stores.inflightRequestCnt = 10
530+
stores.inflightRequestMu.Unlock()
531+
series, warnings, err := querySeries(stores, "user_id", "metric_name", 0, 0)
532+
require.Errorf(t, err, "too many inflight requests in store gateway, limit = 10")
533+
assert.Empty(t, series)
534+
assert.Empty(t, warnings)
535+
}
536+
537+
func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabled(t *testing.T) {
538+
cfg := prepareStorageConfig(t)
539+
cfg.BucketStore.MaxInflightRequest = 0 // disables the limit
540+
reg := prometheus.NewPedanticRegistry()
541+
storageDir := t.TempDir()
542+
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
543+
require.NoError(t, err)
544+
545+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
546+
require.NoError(t, err)
547+
548+
stores.inflightRequestMu.Lock()
549+
stores.inflightRequestCnt = 10
550+
stores.inflightRequestMu.Unlock()
551+
_, _, err = querySeries(stores, "user_id", "metric_name", 0, 0)
552+
require.NoError(t, err)
553+
}
554+
517555
func prepareStorageConfig(t *testing.T) cortex_tsdb.BlocksStorageConfig {
518556
cfg := cortex_tsdb.BlocksStorageConfig{}
519557
flagext.DefaultValues(&cfg)

0 commit comments

Comments
 (0)