Skip to content

Commit

Permalink
Merge pull request #22 from vinted/feature/add-series-match-limit
Browse files Browse the repository at this point in the history
Add series match limit
  • Loading branch information
GiedriusS authored Sep 27, 2022
2 parents 6e2c00a + 2b06a47 commit 782f436
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 16 deletions.
22 changes: 12 additions & 10 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, conf.limitMaxMatchedSeries)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -469,15 +469,16 @@ func (s *promMetadata) Version() string {
}

type sidecarConfig struct {
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
limitMaxMatchedSeries int
}

func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand All @@ -491,4 +492,5 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
sc.shipper.registerFlag(cmd)
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
cmd.Flag("max-matched-series", "Maximum number of series can be matched before reading series data").Default("0").IntVar(&sc.limitMaxMatchedSeries)
}
30 changes: 30 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,3 +838,33 @@ func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets
}
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v)
}

func (c *Client) SeriesMatchCount(ctx context.Context, base *url.URL, matchers []*labels.Matcher, start, end int64) (int, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(start)))
q.Add("end", formatTime(timestamp.Time(end)))
q.Add("only_count", "1")
u.RawQuery = q.Encode()

body, _, err := c.req2xx(ctx, &u, http.MethodGet)
if err != nil {
return -1, errors.Wrap(err, "read query instant response")
}

var m struct {
Status string `json:"status"`
Data struct {
MetricsCount int `json:"metrics_count"`
} `json:"data"`
}

if err = json.Unmarshal(body, &m); err != nil {
return -1, errors.Wrap(err, "unmarshal query instant response")
}

return m.Data.MetricsCount, nil
}
18 changes: 18 additions & 0 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ type PrometheusStore struct {
remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType

framesRead prometheus.Histogram

limitMaxMatchedSeries int
}

// ErrSeriesMatchLimitReached is an error returned by PrometheusStore when matched series limit is enabled and matched series count exceeds the limit.
var ErrSeriesMatchLimitReached = errors.New("series match limit reached")

// Label{Values,Names} call with matchers is supported for Prometheus versions >= 2.24.0.
// https://github.com/prometheus/prometheus/commit/caa173d2aac4c390546b1f78302104b1ccae0878.
var baseVer, _ = semver.Make("2.24.0")
Expand All @@ -77,6 +82,7 @@ func NewPrometheusStore(
externalLabelsFn func() labels.Labels,
timestamps func() (mint int64, maxt int64),
promVersion func() string,
limitMaxMatchedSeries int,
) (*PrometheusStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -101,6 +107,7 @@ func NewPrometheusStore(
Buckets: prometheus.ExponentialBuckets(10, 10, 5),
},
),
limitMaxMatchedSeries: limitMaxMatchedSeries,
}
return p, nil
}
Expand Down Expand Up @@ -155,6 +162,17 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return status.Error(codes.InvalidArgument, "no matchers specified (excluding external labels)")
}

if p.limitMaxMatchedSeries > 0 {
matchedSeriesCount, err := p.client.SeriesMatchCount(s.Context(), p.base, matchers, r.MinTime, r.MaxTime)
if err != nil {
return errors.Wrap(err, "get series match count")
}

if matchedSeriesCount > p.limitMaxMatchedSeries {
return ErrSeriesMatchLimitReached
}
}

// Don't ask for more than available time. This includes potential `minTime` flag limit.
availableMinTime, _ := p.timestamps()
if r.MinTime < availableMinTime {
Expand Down
105 changes: 99 additions & 6 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
limitMinT := int64(0)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return limitMinT, -1 }, nil) // Maxt does not matter.
func() (int64, int64) { return limitMinT, -1 }, nil, 0) // Maxt does not matter.
testutil.Ok(t, err)

// Query all three samples except for the first one. Since we round up queried data
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) {

promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil)
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil, 0)
testutil.Ok(t, err)

for _, tcase := range []struct {
Expand Down Expand Up @@ -361,7 +361,7 @@ func TestPrometheusStore_LabelAPIs(t *testing.T) {

promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels {
return extLset
}, nil, func() string { return version })
}, nil, func() string { return version }, 0)
testutil.Ok(t, err)

return promStore
Expand Down Expand Up @@ -396,7 +396,7 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) {

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 }, nil)
func() (int64, int64) { return 0, math.MaxInt64 }, nil, 0)
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down Expand Up @@ -430,6 +430,99 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) {
testutil.Equals(t, 0, len(srv.SeriesSet))
}

func TestPrometheusStore_Series_LimitMaxMatchedSeries(t *testing.T) {
defer testutil.TolerantVerifyLeak(t)

p, err := e2eutil.NewPrometheus()
testutil.Ok(t, err)
defer func() { testutil.Ok(t, p.Stop()) }()

baseT := timestamp.FromTime(time.Now()) / 1000 * 1000

a := p.Appender()
_, err = a.Append(0, labels.FromStrings("a", "b", "b", "d"), baseT+100, 1)
testutil.Ok(t, err)
_, err = a.Append(0, labels.FromStrings("a", "c", "b", "d", "job", "test"), baseT+200, 2)
testutil.Ok(t, err)
_, err = a.Append(0, labels.FromStrings("a", "d", "b", "d", "job", "test"), baseT+300, 3)
testutil.Ok(t, err)
_, err = a.Append(0, labels.FromStrings("b", "d", "job", "test"), baseT+400, 4)
testutil.Ok(t, err)
testutil.Ok(t, a.Commit())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testutil.Ok(t, p.Start())

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

req := &storepb.SeriesRequest{
SkipChunks: true,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "job", Value: "test"},
},
MinTime: baseT,
MaxTime: baseT + 300,
}

expected2Series := []storepb.Series{
{
Labels: []labelpb.ZLabel{{Name: "a", Value: "c"}, {Name: "b", Value: "d"}, {Name: "job", Value: "test"}, {Name: "region", Value: "eu-west"}},
},
{
Labels: []labelpb.ZLabel{{Name: "a", Value: "d"}, {Name: "b", Value: "d"}, {Name: "job", Value: "test"}, {Name: "region", Value: "eu-west"}},
},
}

for _, tcase := range []struct {
req *storepb.SeriesRequest
expected []storepb.Series
expectedErr error
limitMaxMatchedSeries int
}{
// limit is not active
{
limitMaxMatchedSeries: 0,
req: req,
expected: expected2Series,
},
// should return limit error as 'limit < matched series'
{
limitMaxMatchedSeries: 1,
req: req,
expected: expected2Series,
expectedErr: ErrSeriesMatchLimitReached,
},
// should succeed as limit is not reached
{
limitMaxMatchedSeries: 2,
req: req,
expected: expected2Series,
},
} {
t.Run("", func(t *testing.T) {
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil,
tcase.limitMaxMatchedSeries)
testutil.Ok(t, err)

srv := newStoreSeriesServer(ctx)
err = promStore.Series(tcase.req, srv)
if tcase.expectedErr != nil {
testutil.NotOk(t, err)
testutil.Equals(t, tcase.expectedErr.Error(), err.Error())
return
}
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), srv.Warnings)
testutil.Equals(t, tcase.expected, srv.SeriesSet)
})
}
}

func TestPrometheusStore_Info(t *testing.T) {
defer testutil.TolerantVerifyLeak(t)

Expand All @@ -438,7 +531,7 @@ func TestPrometheusStore_Info(t *testing.T) {

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 123, 456 }, nil)
func() (int64, int64) { return 123, 456 }, nil, 0)
testutil.Ok(t, err)

resp, err := proxy.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -516,7 +609,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 }, nil)
func() (int64, int64) { return 0, math.MaxInt64 }, nil, 0)
testutil.Ok(t, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
Expand Down

0 comments on commit 782f436

Please sign in to comment.