Skip to content

Commit

Permalink
Implement metadata API limit in stores
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
  • Loading branch information
harry671003 committed Aug 20, 2024
1 parent 9301004 commit e061ad0
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 103 deletions.
9 changes: 6 additions & 3 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,14 +734,15 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string

// SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]map[string]string, error) {
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]map[string]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand All @@ -753,7 +754,7 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*la

// LabelNamesInGRPC returns all known label names constrained by the given matchers. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()
Expand All @@ -763,6 +764,7 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand All @@ -773,7 +775,7 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()
Expand All @@ -783,6 +785,7 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand Down
16 changes: 14 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
req := storepb.SeriesRequest{
MinTime: hints.Start,
MaxTime: hints.End,
Limit: int64(hints.Limit),
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: aggrs,
Expand Down Expand Up @@ -373,7 +374,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
}

// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
func (q *querier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
span, ctx := tracing.StartSpan(ctx, "querier_label_values")
defer span.Finish()

Expand All @@ -384,12 +385,18 @@ func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.Label
if err != nil {
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

if hints == nil {
hints = &storage.LabelHints{}
}

req := &storepb.LabelValuesRequest{
Label: name,
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
Limit: int64(hints.Limit),
}

if q.isDedupEnabled() {
Expand All @@ -411,7 +418,7 @@ func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.Label

// LabelNames returns all the unique label names present in the block in sorted order constrained
// by the given matchers.
func (q *querier) LabelNames(ctx context.Context, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
func (q *querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
span, ctx := tracing.StartSpan(ctx, "querier_label_names")
defer span.Finish()

Expand All @@ -423,11 +430,16 @@ func (q *querier) LabelNames(ctx context.Context, _ *storage.LabelHints, matcher
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

if hints == nil {
hints = &storage.LabelHints{}
}

req := &storepb.LabelNamesRequest{
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
Limit: int64(hints.Limit),
}

if q.isDedupEnabled() {
Expand Down
21 changes: 19 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
begin := time.Now()
set := NewResponseDeduplicator(NewProxyResponseLoserTree(respSets...))
i := 0
for set.Next() {
i++
if req.Limit > 0 && i > int(req.Limit) {
break
}
at := set.At()
warn := at.GetWarning()
if warn != "" {
Expand Down Expand Up @@ -1857,6 +1862,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
MaxTime: req.End,
SkipChunks: true,
WithoutReplicaLabels: req.WithoutReplicaLabels,
Limit: req.Limit,
}
blockClient := newBlockSeriesClient(
newCtx,
Expand Down Expand Up @@ -1945,8 +1951,13 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
return nil, status.Error(codes.Unknown, errors.Wrap(err, "marshal label names response hints").Error())
}

names := strutil.MergeSlices(sets...)
if req.Limit > 0 && len(names) > int(req.Limit) {
names = names[:req.Limit]
}

return &storepb.LabelNamesResponse{
Names: strutil.MergeSlices(sets...),
Names: names,
Hints: anyHints,
}, nil
}
Expand Down Expand Up @@ -2069,6 +2080,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
seriesReq := &storepb.SeriesRequest{
MinTime: req.Start,
MaxTime: req.End,
Limit: req.Limit,
SkipChunks: true,
WithoutReplicaLabels: req.WithoutReplicaLabels,
}
Expand Down Expand Up @@ -2160,8 +2172,13 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
return nil, status.Error(codes.Unknown, errors.Wrap(err, "marshal label values response hints").Error())
}

vals := strutil.MergeSlices(sets...)
if req.Limit > 0 && len(vals) > int(req.Limit) {
vals = vals[:req.Limit]
}

return &storepb.LabelValuesResponse{
Values: strutil.MergeSlices(sets...),
Values: vals,
Hints: anyHints,
}, nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto

if r.SkipChunks {
finalExtLset := rmLabels(extLset.Copy(), extLsetToRemove)
labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime)
labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit))
if err != nil {
return err
}
Expand Down Expand Up @@ -571,12 +571,12 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR

var lbls []string
if len(matchers) == 0 || p.labelCallsSupportMatchers() {
lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End)
lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
if err != nil {
return nil, err
}
} else {
sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End)
sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -642,7 +642,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
if len(matchers) == 0 {
return &storepb.LabelValuesResponse{Values: []string{val}}, nil
}
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End)
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
if err != nil {
return nil, err
}
Expand All @@ -653,12 +653,12 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
}

if len(matchers) == 0 || p.labelCallsSupportMatchers() {
vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End)
vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit))
if err != nil {
return nil, err
}
} else {
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End)
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
if err != nil {
return nil, err
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
r := &storepb.SeriesRequest{
MinTime: originalRequest.MinTime,
MaxTime: originalRequest.MaxTime,
Limit: originalRequest.Limit,
Matchers: append(storeMatchers, MatchersForLabelSets(storeLabelSets)...),
Aggregates: originalRequest.Aggregates,
MaxResolutionWindow: originalRequest.MaxResolutionWindow,
Expand Down Expand Up @@ -419,6 +420,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La
End: originalRequest.End,
Matchers: append(storeMatchers, MatchersForLabelSets(storeLabelSets)...),
WithoutReplicaLabels: originalRequest.WithoutReplicaLabels,
Hints: originalRequest.Hints,
}

var (
Expand Down Expand Up @@ -465,8 +467,13 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La
return nil, err
}

result := strutil.MergeUnsortedSlices(names...)
if originalRequest.Limit > 0 && len(result) > int(originalRequest.Limit) {
result = result[:originalRequest.Limit]
}

return &storepb.LabelNamesResponse{
Names: strutil.MergeUnsortedSlices(names...),
Names: result,
Warnings: warnings,
}, nil
}
Expand Down Expand Up @@ -520,6 +527,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, originalRequest *storepb.L
End: originalRequest.End,
Matchers: append(storeMatchers, MatchersForLabelSets(storeLabelSets)...),
WithoutReplicaLabels: originalRequest.WithoutReplicaLabels,
Limit: originalRequest.Limit,
}

var (
Expand Down Expand Up @@ -567,8 +575,13 @@ func (s *ProxyStore) LabelValues(ctx context.Context, originalRequest *storepb.L
return nil, err
}

vals := strutil.MergeUnsortedSlices(all...)
if originalRequest.Limit > 0 && len(vals) > int(originalRequest.Limit) {
vals = vals[:originalRequest.Limit]
}

return &storepb.LabelValuesResponse{
Values: strutil.MergeUnsortedSlices(all...),
Values: vals,
Warnings: warnings,
}, nil
}
Expand Down
Loading

0 comments on commit e061ad0

Please sign in to comment.