Skip to content

Commit

Permalink
distributor: support for limit parameter in the finding series endpoi…
Browse files Browse the repository at this point in the history
…nt (#10620)

* distributor: pass select hints to MetricsForLabelMatchers

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* update changelog

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* fix tests

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* Update pkg/distributor/distributor_test.go

Co-authored-by: Marco Pracucci <marco@pracucci.com>

* fix changelog

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* fix typo

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* distributor: apply requested limit while deduplicating metrics

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

---------

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
Co-authored-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
narqo and pracucci authored Feb 12, 2025
1 parent 7338cd2 commit bb0636a
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 133 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
* [ENHANCEMENT] Query-frontend: Allow adjustment of queries looking into the future to a maximum duration with experimental `-query-frontend.max-future-query-window` flag. #10547
* [ENHANCEMENT] Ruler: Adds support for filtering results from rule status endpoint by `file[]`, `rule_group[]` and `rule_name[]`. #10589
* [ENHANCEMENT] Query-frontend: Add option to "spin off" subqueries as actual range queries, so that they benefit from query acceleration techniques such as sharding, splitting and caching. To enable this, set the `-query-frontend.spin-off-instant-subqueries-to-url=<url>` option on the frontend and the `instant_queries_with_subquery_spin_off` per-tenant override with regular expressions matching the queries to enable. #10460 #10603 #10621
* [ENHANCEMENT] Querier, ingester: The series API respects passed `limit` parameter. #10620
* [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185
* [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154
* [BUGFIX] Query-frontend and querier: show warning/info annotations in some cases where they were missing (if a lazy querier was used). #10277
Expand Down
12 changes: 10 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2743,13 +2743,13 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hints

// MetricsForLabelMatchers returns a list of series with samples timestamps between from and through, and series labels
// matching the optional label matchers. The returned series are not sorted.
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, hints *storage.SelectHints, matchers ...*labels.Matcher) ([]labels.Labels, error) {
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
if err != nil {
return nil, err
}

req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, matchers)
req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, hints, matchers)
if err != nil {
return nil, err
}
Expand All @@ -2761,10 +2761,18 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
return nil, err
}

metricsLimit := math.MaxInt
if hints != nil && hints.Limit > 0 {
metricsLimit = hints.Limit
}
metrics := map[uint64]labels.Labels{}
respsLoop:
for _, resp := range resps {
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp)
for _, m := range ms {
if len(metrics) >= metricsLimit {
break respsLoop
}
metrics[labels.StableHash(m)] = m
}
}
Expand Down
79 changes: 75 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2415,6 +2415,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
tests := map[string]struct {
shuffleShardSize int
matchers []*labels.Matcher
hints *storage.SelectHints
maxSeriesPerQuery int
expectedResult [][]mimirpb.LabelAdapter
expectedIngesters int
Expand All @@ -2424,13 +2425,26 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "unknown"),
},
hints: &storage.SelectHints{},
expectedResult: nil,
expectedIngesters: numIngesters,
},
"should filter metrics by single matcher": {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
hints: &storage.SelectHints{},
expectedResult: [][]mimirpb.LabelAdapter{
fixtures[0].lbls,
fixtures[1].lbls,
},
expectedIngesters: numIngesters,
},
"should filter metrics by matcher when no hints passed": {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
hints: nil,
expectedResult: [][]mimirpb.LabelAdapter{
fixtures[0].lbls,
fixtures[1].lbls,
Expand All @@ -2442,6 +2456,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
mustNewMatcher(labels.MatchEqual, "status", "200"),
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
hints: &storage.SelectHints{},
expectedResult: [][]mimirpb.LabelAdapter{
fixtures[0].lbls,
},
Expand All @@ -2451,6 +2466,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "fast_fingerprint_collision"),
},
hints: &storage.SelectHints{},
expectedResult: [][]mimirpb.LabelAdapter{
fixtures[3].lbls,
fixtures[4].lbls,
Expand All @@ -2462,6 +2478,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
hints: &storage.SelectHints{},
expectedResult: [][]mimirpb.LabelAdapter{
fixtures[0].lbls,
fixtures[1].lbls,
Expand All @@ -2472,9 +2489,34 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
hints: &storage.SelectHints{},
maxSeriesPerQuery: 1,
expectedError: validation.NewLimitError("the query exceeded the maximum number of series (limit: 1 series) (err-mimir-max-series-per-query). Consider reducing the time range and/or number of series selected by the query. One way to reduce the number of selected series is to add more label matchers to the query. Otherwise, to adjust the related per-tenant limit, configure -querier.max-fetched-series-per-query, or contact your service administrator."),
},
"should error out if max series per query is reached before hint limit": {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, "test_.*"),
},
hints: &storage.SelectHints{
Limit: 2,
},
maxSeriesPerQuery: 1,
expectedError: validation.NewLimitError("the query exceeded the maximum number of series (limit: 1 series) (err-mimir-max-series-per-query). Consider reducing the time range and/or number of series selected by the query. One way to reduce the number of selected series is to add more label matchers to the query. Otherwise, to adjust the related per-tenant limit, configure -querier.max-fetched-series-per-query, or contact your service administrator."),
},
"should not return more metrics than hint limit": {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
hints: &storage.SelectHints{
Limit: 1,
},
// For the case when the limit hint set the test checks that the returned metrics are in the subset of the expectedResult.
expectedResult: [][]mimirpb.LabelAdapter{
fixtures[0].lbls,
fixtures[1].lbls,
},
expectedIngesters: numIngesters,
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -2518,14 +2560,20 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
// Set up limiter
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(testData.maxSeriesPerQuery, 0, 0, 0, stats.NewQueryMetrics(prometheus.NewPedanticRegistry())))

metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...)
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.hints, testData.matchers...)
if testData.expectedError != nil {
require.ErrorIs(t, err, testData.expectedError)
return
}

require.NoError(t, err)
requireLabelAdaptersMatchLabels(t, testData.expectedResult, metrics)

if testData.hints == nil || testData.hints.Limit == 0 {
requireLabelAdaptersMatchLabels(t, testData.expectedResult, metrics)
} else {
// The order of ingester responses isn't guaranteed. Thus, we can only test that the distributor's response is in the subset of expectedResult.
require.Len(t, metrics, testData.hints.Limit)
requireLabelAdaptersContainLabels(t, testData.expectedResult, metrics)
}

// Check how many ingesters have been queried.
if ingestStorageEnabled {
Expand Down Expand Up @@ -2709,6 +2757,23 @@ func requireLabelAdaptersMatchLabels(tb testing.TB, a [][]mimirpb.LabelAdapter,
promtestutil.RequireEqual(tb, a, bAsLabelAdapters)
}

// Check that the LabelAdaptors contain the Labels. Assume LabelAdaptors are already sorted.
func requireLabelAdaptersContainLabels(tb testing.TB, a [][]mimirpb.LabelAdapter, b []labels.Labels) {
tb.Helper()
if len(a) == 0 && len(b) == 0 {
return
}
bAsLabelAdapters := make([][]mimirpb.LabelAdapter, len(b))
for i, s := range b {
bAsLabelAdapters[i] = mimirpb.FromLabelsToLabelAdapters(s)
}
slices.SortFunc(bAsLabelAdapters, mimirpb.CompareLabelAdapters)

for _, bb := range bAsLabelAdapters {
require.Contains(tb, a, bb)
}
}

func TestDistributor_ActiveNativeHistogramSeries(t *testing.T) {
const numIngesters = 5
const responseSizeLimitBytes = 1024
Expand Down Expand Up @@ -6494,7 +6559,7 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
return nil, errFail
}

multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
_, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
if err != nil {
return nil, err
}
Expand All @@ -6507,6 +6572,12 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
}
}
}

// Always sort metrics by their labels to make testing simpler.
slices.SortFunc(response.Metric, func(m1, m2 *mimirpb.Metric) int {
return mimirpb.CompareLabelAdapters(m1.Labels, m2.Labels)
})

return &response, nil
}

Expand Down
16 changes: 12 additions & 4 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,30 +73,38 @@ func FromExemplarQueryRequest(req *ExemplarQueryRequest) (int64, int64, [][]*lab
}

// ToMetricsForLabelMatchersRequest builds a MetricsForLabelMatchersRequest proto
func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Matcher) (*MetricsForLabelMatchersRequest, error) {
func ToMetricsForLabelMatchersRequest(from, to model.Time, hints *storage.SelectHints, matchers []*labels.Matcher) (*MetricsForLabelMatchersRequest, error) {
ms, err := ToLabelMatchers(matchers)
if err != nil {
return nil, err
}

var limit int64
if hints != nil && hints.Limit > 0 {
limit = int64(hints.Limit)
}
return &MetricsForLabelMatchersRequest{
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
MatchersSet: []*LabelMatchers{{Matchers: ms}},
Limit: limit,
}, nil
}

// FromMetricsForLabelMatchersRequest unpacks a MetricsForLabelMatchersRequest proto.
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) ([][]*labels.Matcher, error) {
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (*storage.SelectHints, [][]*labels.Matcher, error) {
matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet))
for _, matchers := range req.MatchersSet {
matchers, err := FromLabelMatchers(matchers.Matchers)
if err != nil {
return nil, err
return nil, nil, err
}
matchersSet = append(matchersSet, matchers)
}
return matchersSet, nil
hints := &storage.SelectHints{
Limit: int(req.Limit),
}
return hints, matchersSet, nil
}

// FromMetricsForLabelMatchersResponse unpacks a MetricsForLabelMatchersResponse proto
Expand Down
Loading

0 comments on commit bb0636a

Please sign in to comment.