Skip to content

Commit

Permalink
add time range parameters to labels API
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed Jul 31, 2020
1 parent 13d8efc commit bc9005d
Show file tree
Hide file tree
Showing 17 changed files with 623 additions and 117 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2926](https://github.com/thanos-io/thanos/pull/2926) API: Add new blocks HTTP API to serve blocks metadata. The status endpoints (`/api/v1/status/flags`, `/api/v1/status/runtimeinfo` and `/api/v1/status/buildinfo`) are now available on all components with a HTTP API.
- [#2892](https://github.com/thanos-io/thanos/pull/2892) Receive: Receiver fails when the initial upload fails.
- [#2865](https://github.com/thanos-io/thanos/pull/2865) ui: Migrate Thanos Ruler UI to React
- [#2964](https://github.com/thanos-io/thanos/pull/2964) Query: Add time range parameters to label APIs. Add `start` and `end` fields to Store API `LabelNamesRequest` and `LabelValuesRequest`.

### Changed

Expand Down
82 changes: 52 additions & 30 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,9 @@ func (qapi *QueryAPI) parsePartialResponseParam(r *http.Request, defaultEnablePa
}

func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiError) {
var ts time.Time
if t := r.FormValue("time"); t != "" {
var err error
ts, err = parseTime(t)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
} else {
ts = qapi.baseAPI.Now()
ts, err := parseTimeParam(r, "time", qapi.baseAPI.Now())
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

ctx := r.Context()
Expand Down Expand Up @@ -394,12 +388,27 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid label name: %q", name)}
}

start, err := parseTimeParam(r, "start", minTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
end, err := parseTimeParam(r, "end", maxTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
// Prometheus doesn't check this, do we need to?
if end.Before(start) {
err := errors.New("end timestamp must not be before start time")
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := qapi.queryableCreate(true, nil, 0, enablePartialResponse, false).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down Expand Up @@ -433,26 +442,13 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.New("no match[] parameter provided")}
}

var start time.Time
if t := r.FormValue("start"); t != "" {
var err error
start, err = parseTime(t)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
} else {
start = minTime
start, err := parseTimeParam(r, "start", minTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

var end time.Time
if t := r.FormValue("end"); t != "" {
var err error
end, err = parseTime(t)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
} else {
end = maxTime
end, err := parseTimeParam(r, "end", maxTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

var matcherSets [][]*labels.Matcher
Expand Down Expand Up @@ -504,6 +500,18 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return metrics, set.Warnings(), nil
}

func parseTimeParam(r *http.Request, paramName string, defaultValue time.Time) (time.Time, error) {
val := r.FormValue(paramName)
if val == "" {
return defaultValue, nil
}
result, err := parseTime(val)
if err != nil {
return time.Time{}, errors.Wrapf(err, "Invalid time value for '%s'", paramName)
}
return result, nil
}

func parseTime(s string) (time.Time, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
Expand Down Expand Up @@ -532,12 +540,26 @@ func parseDuration(s string) (time.Duration, error) {
func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.ApiError) {
ctx := r.Context()

start, err := parseTimeParam(r, "start", minTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
end, err := parseTimeParam(r, "end", maxTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
if end.Before(start) {
err := errors.New("end timestamp must not be before start time")
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := qapi.queryableCreate(true, nil, 0, enablePartialResponse, false).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,9 +587,14 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []sto

// LabelNames returns all known label names. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, startTime, endTime int64) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()

q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()

var m struct {
Data []string `json:"data"`
Expand All @@ -599,9 +604,14 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL) ([]string,

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, startTime, endTime int64) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()

q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()

var m struct {
Data []string `json:"data"`
Expand Down
13 changes: 11 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,12 @@ func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse})
resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: name,
PartialResponseDisabled: !q.partialResponse,
Start: q.mint,
End: q.maxt,
})
if err != nil {
return nil, nil, errors.Wrap(err, "proxy LabelValues()")
}
Expand All @@ -342,7 +347,11 @@ func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse})
resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: !q.partialResponse,
Start: q.mint,
End: q.maxt,
})
if err != nil {
return nil, nil, errors.Wrap(err, "proxy LabelNames()")
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func chunksSize(chks []storepb.AggrChunk) (size int) {
}

// LabelNames implements the storepb.StoreServer interface.
func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
func (s *BucketStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
g, gctx := errgroup.WithContext(ctx)

s.mtx.RLock()
Expand All @@ -1055,6 +1055,9 @@ func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesReque
var sets [][]string

for _, b := range s.blocks {
if b.meta.MinTime > r.End || b.meta.MaxTime < r.Start {
continue
}
indexr := b.indexReader(gctx)
g.Go(func() error {
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names")
Expand Down Expand Up @@ -1091,6 +1094,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
var sets [][]string

for _, b := range s.blocks {
if b.meta.MinTime > req.End || b.meta.MaxTime < req.Start {
continue
}
indexr := b.indexReader(gctx)
g.Go(func() error {
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values")
Expand Down
74 changes: 72 additions & 2 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite) {
testutil.Equals(t, s.minTime, mint)
testutil.Equals(t, s.maxTime, maxt)

vals, err := s.store.LabelValues(ctx, &storepb.LabelValuesRequest{Label: "a"})
vals, err := s.store.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
})
testutil.Ok(t, err)
testutil.Equals(t, []string{"1", "2"}, vals.Values)

Expand Down Expand Up @@ -381,7 +385,7 @@ func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite) {
MaxTime: maxt,
},
},
// Test no-chunk option.
// Test skip-chunk option.
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
Expand Down Expand Up @@ -599,3 +603,69 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
})
}
}

func TestBucketStore_LabelNames_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dir, err := ioutil.TempDir("", "test_bucketstore_label_names_e2e")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
testutil.Equals(t, s.maxTime, maxt)

vals, err := s.store.LabelNames(ctx, &storepb.LabelNamesRequest{
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
})
testutil.Ok(t, err)
testutil.Equals(t, []string{"a", "b", "c"}, vals)

// Outside the time range.
vals, err = s.store.LabelNames(ctx, &storepb.LabelNamesRequest{
Start: timestamp.FromTime(time.Now().Add(-24 * time.Hour)),
End: timestamp.FromTime(time.Now().Add(-23 * time.Hour)),
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)
})
}

func TestBucketStore_LabelValues_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dir, err := ioutil.TempDir("", "test_bucketstore_label_values_e2e")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
testutil.Equals(t, s.maxTime, maxt)

vals, err := s.store.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
})
testutil.Ok(t, err)
testutil.Equals(t, []string{"1", "2"}, vals.Values)

// Outside the time range.
vals, err = s.store.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(time.Now().Add(-24 * time.Hour)),
End: timestamp.FromTime(time.Now().Add(-23 * time.Hour)),
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals.Values)
})
}
6 changes: 3 additions & 3 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,8 @@ Outer:
}

// LabelNames returns all known label names.
func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
lbls, err := p.client.LabelNamesInGRPC(ctx, p.base)
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
lbls, err := p.client.LabelNamesInGRPC(ctx, p.base, r.Start, r.End)
if err != nil {
return nil, err
}
Expand All @@ -536,7 +536,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
return &storepb.LabelValuesResponse{Values: []string{l}}, nil
}

vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label)
vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label, r.Start, r.End)
if err != nil {
return nil, err
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,22 @@ func TestPrometheusStore_LabelNames_e2e(t *testing.T) {
proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelNames(ctx, &storepb.LabelNamesRequest{})
resp, err := proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{"a"}, resp.Names)

// Outside time range.
resp, err = proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
Start: timestamp.FromTime(maxTime.Add(-time.Second)),
End: timestamp.FromTime(maxTime),
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{}, resp.Names)
}

func TestPrometheusStore_LabelValues_e2e(t *testing.T) {
Expand Down Expand Up @@ -413,10 +425,22 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{"a", "b", "c"}, resp.Values)

// Outside time range.
resp, err = proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(maxTime.Add(-time.Second)),
End: timestamp.FromTime(maxTime),
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{}, resp.Values)
}

// Test to check external label values retrieve.
Expand Down
Loading

0 comments on commit bc9005d

Please sign in to comment.