Skip to content

Commit da11bff

Browse files
treid314Tyler ReidTyler Reidpracucci
authored
Add support for Max Series Per Query for block storage and streaming ingesters (#4179)
* Add support for Max Series Per Query for block storage and streaming ingesters and update limits.go to reflect changes Signed-off-by: Tyler Reid <treid@Tylers-MacBook-Air.local> * Fix spacing issues Signed-off-by: Tyler Reid <treid@Tylers-MacBook-Air.local> * Review and linter changes Signed-off-by: Tyler Reid <treid@Tylers-MacBook-Air.local> * Update docs Signed-off-by: Tyler Reid <treid@Tylers-MacBook-Air.local> Signed-off-by: Tyler Reid <tyler.reid@grafana.com> * Add changelog for series per query limit Signed-off-by: Tyler Reid <tyler.reid@grafana.com> * Apply suggestions from code review Co-authored-by: Marco Pracucci <marco@pracucci.com> Signed-off-by: Tyler Reid <tyler.reid@grafana.com> * address review comments Signed-off-by: Tyler Reid <tyler.reid@grafana.com> * Added integration test for series per query limit Signed-off-by: Tyler Reid <tyler.reid@grafana.com> * Fix up integration test Signed-off-by: Tyler Reid <tyler.reid@grafana.com> * Add new config option querier.max-series-per-query and use that instead of ingester.max-series-per-query for series per query limit Signed-off-by: Tyler Reid <tyler.reid@grafana.com> * Update docs to show new cli flag and other review changes Signed-off-by: Tyler Reid <tyler.reid@grafana.com> * Check error to resolve linter issue Signed-off-by: Tyler Reid <tyler.reid@grafana.com> * Clean white noise Signed-off-by: Marco Pracucci <marco@pracucci.com> Co-authored-by: Tyler Reid <treid@Tylers-MacBook-Air.local> Co-authored-by: Tyler Reid <tyler.reid@grafana.com> Co-authored-by: Marco Pracucci <marco@pracucci.com>
1 parent fde0a62 commit da11bff

File tree

12 files changed

+407
-38
lines changed

12 files changed

+407
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- `-alertmanager.receivers-firewall.block.cidr-networks` renamed to `-alertmanager.receivers-firewall-block-cidr-networks`
88
- `-alertmanager.receivers-firewall.block.private-addresses` renamed to `-alertmanager.receivers-firewall-block-private-addresses`
99
* [CHANGE] Change default value of `-server.grpc.keepalive.min-time-between-pings` to `10s` and `-server.grpc.keepalive.ping-without-stream-allowed` to `true`. #4168
10+
* [FEATURE] Querier: Added new `-querier.max-fetched-series-per-query` flag. When Cortex is running with blocks storage, the max series per query limit is enforced in the querier and applies to unique series received from ingesters and store-gateway (long-term storage). #4179
1011
* [FEATURE] Alertmanager: Added rate-limits to notifiers. Rate limits used by all integrations can be configured using `-alertmanager.notification-rate-limit`, while per-integration rate limits can be specified via `-alertmanager.notification-rate-limit-per-integration` parameter. Both shared and per-integration limits can be overwritten using overrides mechanism. These limits are applied on individual (per-tenant) alertmanagers. Rate-limited notifications are failed notifications. It is possible to monitor rate-limited notifications via new `cortex_alertmanager_notification_rate_limited_total` metric. #4135 #4163
1112
* [ENHANCEMENT] Alertmanager: introduced new metrics to monitor operation when using `-alertmanager.sharding-enabled`: #4149
1213
* `cortex_alertmanager_state_fetch_replica_state_total`

docs/configuration/arguments.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,15 +483,18 @@ Valid per-tenant limits are (with their corresponding flags for default values):
483483
Requires `-distributor.replication-factor`, `-distributor.shard-by-all-labels`, `-distributor.sharding-strategy` and `-distributor.zone-awareness-enabled` set for the ingesters too.
484484

485485
- `max_series_per_query` / `-ingester.max-series-per-query`
486+
486487
- `max_samples_per_query` / `-ingester.max-samples-per-query`
487488

488489
Limits on the number of timeseries and samples returns by a single ingester during a query.
489490

490491
- `max_metadata_per_user` / `-ingester.max-metadata-per-user`
491492
- `max_metadata_per_metric` / `-ingester.max-metadata-per-metric`
492-
493493
Enforced by the ingesters; limits the number of active metadata a user (or a given metric) can have. When running with `-distributor.shard-by-all-labels=false` (the default), this limit will enforce the maximum number of metadata a metric can have 'globally', as all metadata for a single metric will be sent to the same replication set of ingesters. This is not the case when running with `-distributor.shard-by-all-labels=true`, so the actual limit will be N/RF times higher, where N is number of ingester replicas and RF is configured replication factor.
494494

495+
- `max_fetched_series_per_query` / `querier.max-fetched-series-per-query`
496+
When running Cortex with blocks storage this limit is enforced in the queriers on unique series fetched from ingesters and store-gateways (long-term storage).
497+
495498
- `max_global_metadata_per_user` / `-ingester.max-global-metadata-per-user`
496499
- `max_global_metadata_per_metric` / `-ingester.max-global-metadata-per-metric`
497500

docs/configuration/config-file-reference.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3940,7 +3940,8 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
39403940
# The maximum number of series for which a query can fetch samples from each
39413941
# ingester. This limit is enforced only in the ingesters (when querying samples
39423942
# not flushed to the storage yet) and it's a per-instance limit. This limit is
3943-
# ignored when running the Cortex blocks storage.
3943+
# ignored when running the Cortex blocks storage. When running Cortex with
3944+
# blocks storage use -querier.max-fetched-series-per-query limit instead.
39443945
# CLI flag: -ingester.max-series-per-query
39453946
[max_series_per_query: <int> | default = 100000]
39463947
@@ -4012,6 +4013,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
40124013
# CLI flag: -querier.max-fetched-chunks-per-query
40134014
[max_fetched_chunks_per_query: <int> | default = 0]
40144015
4016+
# The maximum number of unique series for which a query can fetch samples from
4017+
# each ingesters and blocks storage. This limit is enforced in the querier only
4018+
# when running Cortex with blocks storage. 0 to disable
4019+
# CLI flag: -querier.max-fetched-series-per-query
4020+
[max_fetched_series_per_query: <int> | default = 0]
4021+
40154022
# Limit how long back data (series and metadata) can be queried, up until
40164023
# <lookback> duration ago. This limit is enforced in the query-frontend, querier
40174024
# and ruler. If the requested time range is outside the allowed range, the

integration/querier_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,85 @@ func TestQuerierWithChunksStorage(t *testing.T) {
889889
assertServiceMetricsPrefixes(t, TableManager, tableManager)
890890
}
891891

892+
func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) {
893+
const blockRangePeriod = 5 * time.Second
894+
895+
s, err := e2e.NewScenario(networkName)
896+
require.NoError(t, err)
897+
defer s.Close()
898+
899+
// Configure the blocks storage to frequently compact TSDB head
900+
// and ship blocks to the storage.
901+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
902+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
903+
"-blocks-storage.tsdb.ship-interval": "1s",
904+
"-blocks-storage.bucket-store.sync-interval": "1s",
905+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
906+
"-querier.ingester-streaming": "true",
907+
"-querier.query-store-for-labels-enabled": "true",
908+
"-querier.max-fetched-series-per-query": "3",
909+
})
910+
911+
// Start dependencies.
912+
consul := e2edb.NewConsul()
913+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
914+
memcached := e2ecache.NewMemcached()
915+
require.NoError(t, s.StartAndWaitReady(consul, minio, memcached))
916+
917+
// Add the memcached address to the flags.
918+
flags["-blocks-storage.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort)
919+
920+
// Start Cortex components.
921+
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
922+
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
923+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), flags, "")
924+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway))
925+
926+
// Start the querier with configuring store-gateway addresses if sharding is disabled.
927+
flags = mergeFlags(flags, map[string]string{
928+
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
929+
})
930+
931+
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
932+
require.NoError(t, s.StartAndWaitReady(querier))
933+
934+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
935+
require.NoError(t, err)
936+
937+
// Push some series to Cortex.
938+
series1Timestamp := time.Now()
939+
series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
940+
series3Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
941+
series4Timestamp := series1Timestamp.Add(blockRangePeriod * 3)
942+
943+
series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"})
944+
series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"})
945+
series3, _ := generateSeries("series_3", series3Timestamp, prompb.Label{Name: "series_3", Value: "series_3"})
946+
series4, _ := generateSeries("series_4", series4Timestamp, prompb.Label{Name: "series_4", Value: "series_4"})
947+
948+
res, err := c.Push(series1)
949+
require.NoError(t, err)
950+
require.Equal(t, 200, res.StatusCode)
951+
res, err = c.Push(series2)
952+
require.NoError(t, err)
953+
require.Equal(t, 200, res.StatusCode)
954+
955+
result, err := c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series2Timestamp.Add(1*time.Hour), blockRangePeriod)
956+
require.NoError(t, err)
957+
require.Equal(t, model.ValMatrix, result.Type())
958+
959+
res, err = c.Push(series3)
960+
require.NoError(t, err)
961+
require.Equal(t, 200, res.StatusCode)
962+
res, err = c.Push(series4)
963+
require.NoError(t, err)
964+
require.Equal(t, 200, res.StatusCode)
965+
966+
_, err = c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series4Timestamp.Add(1*time.Hour), blockRangePeriod)
967+
require.Error(t, err)
968+
assert.Contains(t, err.Error(), "max number of series limit")
969+
}
970+
892971
func TestHashCollisionHandling(t *testing.T) {
893972
s, err := e2e.NewScenario(networkName)
894973
require.NoError(t, err)

pkg/distributor/distributor_test.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"github.com/cortexproject/cortex/pkg/util"
4141
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
4242
"github.com/cortexproject/cortex/pkg/util/flagext"
43+
"github.com/cortexproject/cortex/pkg/util/limiter"
4344
util_math "github.com/cortexproject/cortex/pkg/util/math"
4445
"github.com/cortexproject/cortex/pkg/util/services"
4546
"github.com/cortexproject/cortex/pkg/util/test"
@@ -945,6 +946,56 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
945946
assert.Contains(t, err.Error(), "the query hit the max number of chunks limit")
946947
}
947948

949+
func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReached(t *testing.T) {
950+
const maxSeriesLimit = 10
951+
952+
limits := &validation.Limits{}
953+
flagext.DefaultValues(limits)
954+
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit))
955+
// Prepare distributors.
956+
ds, _, r, _ := prepare(t, prepConfig{
957+
numIngesters: 3,
958+
happyIngesters: 3,
959+
numDistributors: 1,
960+
shardByAllLabels: true,
961+
limits: limits,
962+
})
963+
defer stopAll(ds, r)
964+
965+
// Push a number of series below the max series limit.
966+
initialSeries := maxSeriesLimit
967+
writeReq := makeWriteRequest(0, initialSeries, 0)
968+
writeRes, err := ds[0].Push(ctx, writeReq)
969+
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
970+
assert.Nil(t, err)
971+
972+
allSeriesMatchers := []*labels.Matcher{
973+
labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"),
974+
}
975+
976+
// Since the number of series is equal to the limit (but doesn't
977+
// exceed it), we expect a query running on all series to succeed.
978+
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
979+
require.NoError(t, err)
980+
assert.Len(t, queryRes.Chunkseries, initialSeries)
981+
982+
// Push more series to exceed the limit once we'll query back all series.
983+
writeReq = &cortexpb.WriteRequest{}
984+
writeReq.Timeseries = append(writeReq.Timeseries,
985+
makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0),
986+
)
987+
988+
writeRes, err = ds[0].Push(ctx, writeReq)
989+
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
990+
assert.Nil(t, err)
991+
992+
// Since the number of series is exceeding the limit, we expect
993+
// a query running on all series to fail.
994+
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
995+
require.Error(t, err)
996+
assert.Contains(t, err.Error(), "max number of series limit")
997+
}
998+
948999
func TestDistributor_Push_LabelRemoval(t *testing.T) {
9491000
ctx = user.InjectOrgID(context.Background(), "user")
9501001

@@ -1953,7 +2004,7 @@ func makeWriteRequestExemplar(seriesLabels []string, timestamp int64, exemplarLa
19532004
Timeseries: []cortexpb.PreallocTimeseries{
19542005
{
19552006
TimeSeries: &cortexpb.TimeSeries{
1956-
//Labels: []cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}},
2007+
// Labels: []cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}},
19572008
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...)),
19582009
Exemplars: []cortexpb.Exemplar{
19592010
{

pkg/distributor/query.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cortexproject/cortex/pkg/util"
2020
"github.com/cortexproject/cortex/pkg/util/extract"
2121
grpc_util "github.com/cortexproject/cortex/pkg/util/grpc"
22+
"github.com/cortexproject/cortex/pkg/util/limiter"
2223
"github.com/cortexproject/cortex/pkg/util/validation"
2324
)
2425

@@ -187,8 +188,9 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re
187188
// queryIngesterStream queries the ingesters using the new streaming API.
188189
func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
189190
var (
190-
chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID)
191-
chunksCount = atomic.Int32{}
191+
chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID)
192+
chunksCount = atomic.Int32{}
193+
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
192194
)
193195

194196
// Fetch samples from multiple ingesters
@@ -230,6 +232,16 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re
230232
return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit))
231233
}
232234
}
235+
for _, series := range resp.Chunkseries {
236+
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
237+
return nil, limitErr
238+
}
239+
}
240+
for _, series := range resp.Timeseries {
241+
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
242+
return nil, limitErr
243+
}
244+
}
233245

234246
result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
235247
result.Timeseries = append(result.Timeseries, resp.Timeseries...)

pkg/querier/blocks_store_queryable.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"golang.org/x/sync/errgroup"
2828
grpc_metadata "google.golang.org/grpc/metadata"
2929

30+
"github.com/cortexproject/cortex/pkg/cortexpb"
3031
"github.com/cortexproject/cortex/pkg/querier/series"
3132
"github.com/cortexproject/cortex/pkg/ring"
3233
"github.com/cortexproject/cortex/pkg/ring/kv"
@@ -37,6 +38,7 @@ import (
3738
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
3839
"github.com/cortexproject/cortex/pkg/tenant"
3940
"github.com/cortexproject/cortex/pkg/util"
41+
"github.com/cortexproject/cortex/pkg/util/limiter"
4042
util_log "github.com/cortexproject/cortex/pkg/util/log"
4143
"github.com/cortexproject/cortex/pkg/util/math"
4244
"github.com/cortexproject/cortex/pkg/util/services"
@@ -423,7 +425,6 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
423425
if maxChunksLimit > 0 {
424426
leftChunksLimit -= numChunks
425427
}
426-
427428
resultMtx.Unlock()
428429

429430
return queriedBlocks, nil
@@ -563,6 +564,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
563564
queriedBlocks = []ulid.ULID(nil)
564565
numChunks = atomic.NewInt32(0)
565566
spanLog = spanlogger.FromContext(ctx)
567+
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
566568
)
567569

568570
// Concurrently fetch series from all clients.
@@ -611,6 +613,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
611613
if s := resp.GetSeries(); s != nil {
612614
mySeries = append(mySeries, s)
613615

616+
// Add series fingerprint to query limiter; will return error if we are over the limit
617+
limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels()))
618+
if limitErr != nil {
619+
return limitErr
620+
}
621+
614622
// Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled).
615623
if maxChunksLimit > 0 {
616624
actual := numChunks.Add(int32(len(s.Chunks)))

0 commit comments

Comments
 (0)