Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Add limit param in metadata APIs #7609

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
* [#7592](https://github.com/thanos-io/thanos/pull/7592) Ruler: Only increment `thanos_rule_evaluation_with_warnings_total` metric for non PromQL warnings.

### Added
* [7609](https://github.com/thanos-io/thanos/pull/7609) API: Add limit param to metadata APIs (series, label names, label values).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can omit the changelog here and add in the actual store implementation PR.
But not a big deal. We can update the changelog when you create another PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This missed #. We can fix in your next pr


### Changed

Expand Down
85 changes: 79 additions & 6 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,11 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, apiErr, func() {}
}

limit, err := parseLimitParam(r.FormValue("limit"))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {}
}

matcherSets, ctx, err := tenancy.RewriteLabelMatchers(ctx, r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField, qapi.enforceTenancy, qapi.tenantLabel, r.Form[MatcherParam])
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
Expand All @@ -1088,6 +1093,10 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
}
defer runutil.CloseWithLogOnErr(qapi.logger, q, "queryable labelValues")

hints := &storage.LabelHints{
Limit: toHintLimit(limit),
}

var (
vals []string
warnings annotations.Annotations
Expand All @@ -1096,7 +1105,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
var callWarnings annotations.Annotations
labelValuesSet := make(map[string]struct{})
for _, matchers := range matcherSets {
vals, callWarnings, err = q.LabelValues(ctx, name, nil, matchers...)
vals, callWarnings, err = q.LabelValues(ctx, name, hints, matchers...)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
Expand All @@ -1112,7 +1121,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
}
sort.Strings(vals)
} else {
vals, warnings, err = q.LabelValues(ctx, name, nil)
vals, warnings, err = q.LabelValues(ctx, name, hints)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
Expand All @@ -1122,6 +1131,11 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
vals = make([]string, 0)
}

if limit > 0 && len(vals) > limit {
vals = vals[:limit]
warnings = warnings.Add(errors.New("results truncated due to limit"))
}

return vals, warnings.AsErrors(), nil, func() {}
}

Expand Down Expand Up @@ -1160,6 +1174,11 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, apiErr, func() {}
}

limit, err := parseLimitParam(r.FormValue("limit"))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {}
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr, func() {}
Expand All @@ -1185,18 +1204,31 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
metrics = []labels.Labels{}
sets []storage.SeriesSet
)

hints := &storage.SelectHints{
Limit: toHintLimit(limit),
Start: start.UnixMilli(),
End: end.UnixMilli(),
}

for _, mset := range matcherSets {
sets = append(sets, q.Select(ctx, false, nil, mset...))
sets = append(sets, q.Select(ctx, false, hints, mset...))
}

set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
warnings := set.Warnings()
for set.Next() {
metrics = append(metrics, set.At().Labels())
if limit > 0 && len(metrics) > limit {
metrics = metrics[:limit]
harry671003 marked this conversation as resolved.
Show resolved Hide resolved
warnings.Add(errors.New("results truncated due to limit"))
return metrics, warnings.AsErrors(), nil, func() {}
}
}
if set.Err() != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: set.Err()}, func() {}
}
return metrics, set.Warnings().AsErrors(), nil, func() {}
return metrics, warnings.AsErrors(), nil, func() {}
}

func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
Expand All @@ -1215,6 +1247,11 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr, func() {}
}

limit, err := parseLimitParam(r.FormValue("limit"))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {}
}

matcherSets, ctx, err := tenancy.RewriteLabelMatchers(r.Context(), r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField, qapi.enforceTenancy, qapi.tenantLabel, r.Form[MatcherParam])
if err != nil {
apiErr := &api.ApiError{Typ: api.ErrorBadData, Err: err}
Expand All @@ -1241,11 +1278,15 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
warnings annotations.Annotations
)

hints := &storage.LabelHints{
Limit: toHintLimit(limit),
}

if len(matcherSets) > 0 {
var callWarnings annotations.Annotations
labelNamesSet := make(map[string]struct{})
for _, matchers := range matcherSets {
names, callWarnings, err = q.LabelNames(ctx, nil, matchers...)
names, callWarnings, err = q.LabelNames(ctx, hints, matchers...)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
Expand All @@ -1261,7 +1302,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
}
sort.Strings(names)
} else {
names, warnings, err = q.LabelNames(ctx, nil)
names, warnings, err = q.LabelNames(ctx, hints)
}

if err != nil {
Expand All @@ -1271,6 +1312,11 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
names = make([]string, 0)
}

if limit > 0 && len(names) > limit {
names = names[:limit]
warnings = warnings.Add(errors.New("results truncated due to limit"))
}

return names, warnings.AsErrors(), nil, func() {}
}

Expand Down Expand Up @@ -1532,6 +1578,33 @@ func parseDuration(s string) (time.Duration, error) {
return 0, errors.Errorf("cannot parse %q to a valid duration", s)
}

// parseLimitParam returning 0 means no limit is to be applied.
func parseLimitParam(s string) (int, error) {
if s == "" {
return 0, nil
}

limit, err := strconv.Atoi(s)
if err != nil {
return 0, errors.Errorf("cannot parse %q to a valid limit", s)
}
if limit < 0 {
return 0, errors.New("limit must be non-negative")
}

return limit, nil
}

// toHintLimit increases the API limit, as returned by parseLimitParam, by 1.
// This allows for emitting warnings when the results are truncated.
func toHintLimit(limit int) int {
// 0 means no limit and avoid int overflow
if limit > 0 && limit < math.MaxInt {
return limit + 1
}
return limit
}

// NewMetricMetadataHandler creates handler compatible with HTTP /api/v1/metadata https://prometheus.io/docs/prometheus/latest/querying/api/#querying-metric-metadata
// which uses gRPC Unary Metadata API.
func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError, func()) {
Expand Down
83 changes: 83 additions & 0 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,15 @@ func TestMetadataEndpoints(t *testing.T) {
},
response: []string{"__name__", "foo", "replica1"},
},
// With limit
{
endpoint: api.labelNames,
query: url.Values{
"match[]": []string{`test_metric_replica2`},
"limit": []string{"2"},
},
response: []string{"__name__", "foo"},
},
{
endpoint: api.labelValues,
query: url.Values{
Expand All @@ -1058,6 +1067,18 @@ func TestMetadataEndpoints(t *testing.T) {
},
response: []string{"test_metric1", "test_metric2", "test_metric_replica1", "test_metric_replica2"},
},
// With limit
{
endpoint: api.labelValues,
query: url.Values{
"match[]": []string{`{foo="bar"}`, `{foo="boo"}`},
"limit": []string{"3"},
},
params: map[string]string{
"name": "__name__",
},
response: []string{"test_metric1", "test_metric2", "test_metric_replica1"},
},
// No matched series.
{
endpoint: api.labelValues,
Expand Down Expand Up @@ -1357,6 +1378,32 @@ func TestMetadataEndpoints(t *testing.T) {
errType: baseAPI.ErrorBadData,
method: http.MethodPost,
},
// With limit
{
endpoint: api.series,
query: url.Values{
"match[]": []string{`{replica="", foo=~"b.+", replica1=""}`},
"limit": []string{"2"},
},
response: []labels.Labels{
labels.FromStrings("__name__", "test_metric1", "foo", "bar"),
labels.FromStrings("__name__", "test_metric1", "foo", "boo"),
},
method: http.MethodPost,
},
// Without limit
{
endpoint: api.series,
query: url.Values{
"match[]": []string{`{replica="", foo=~"b.+", replica1=""}`},
},
response: []labels.Labels{
labels.FromStrings("__name__", "test_metric1", "foo", "bar"),
labels.FromStrings("__name__", "test_metric1", "foo", "boo"),
labels.FromStrings("__name__", "test_metric2", "foo", "boo"),
},
method: http.MethodPost,
},
}

for i, test := range tests {
Expand Down Expand Up @@ -1698,6 +1745,42 @@ func TestParseStoreDebugMatchersParam(t *testing.T) {
}
}

func TestParseLimitParam(t *testing.T) {
var tests = []struct {
input string
fail bool
result int
}{
{
input: "",
fail: false,
result: 0,
}, {
input: "abc",
fail: true,
}, {
input: "10",
fail: false,
result: 10,
},
}

for _, test := range tests {
res, err := parseLimitParam(test.input)
if err != nil && !test.fail {
t.Errorf("Unexpected error for %q: %s", test.input, err)
continue
}
if err == nil && test.fail {
t.Errorf("Expected error for %q but got none", test.input)
continue
}
if !test.fail && res != test.result {
t.Errorf("Expected limit %v for input %q but got %v", test.result, test.input, res)
}
}
}

func TestRulesHandler(t *testing.T) {
twoHAgo := time.Now().Add(-2 * time.Hour)
all := []*rulespb.Rule{
Expand Down
Loading