Skip to content

Commit

Permalink
Apply seriesLimit in nextBatch
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
  • Loading branch information
harry671003 committed Sep 3, 2024
1 parent 9b5f63a commit 44b5b9c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
25 changes: 15 additions & 10 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,12 +1006,12 @@ type blockSeriesClient struct {
extLset labels.Labels
extLsetToRemove map[string]struct{}

mint int64
maxt int64
expandedPostingsLimit int
indexr *bucketIndexReader
chunkr *bucketChunkReader
loadAggregates []storepb.Aggr
mint int64
maxt int64
seriesLimit int
indexr *bucketIndexReader
chunkr *bucketChunkReader
loadAggregates []storepb.Aggr

seriesLimiter SeriesLimiter
chunksLimiter ChunksLimiter
Expand Down Expand Up @@ -1084,7 +1084,7 @@ func newBlockSeriesClient(

mint: req.MinTime,
maxt: req.MaxTime,
expandedPostingsLimit: int(req.Limit),
seriesLimit: int(req.Limit),
indexr: b.indexReader(logger),
chunkr: chunkr,
seriesLimiter: seriesLimiter,
Expand Down Expand Up @@ -1164,10 +1164,10 @@ func (b *blockSeriesClient) ExpandPostings(
b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2)
b.lazyExpandedPostingsCount.Inc()
} else {
// If expandedPostingsLimit is set, it can be applied here to limit the amount of series.
// If seriesLimit is set, it can be applied here to limit the amount of series.
// Note: This can only be done when postings are not expanded lazily.
if b.expandedPostingsLimit > 0 && len(b.lazyPostings.postings) > b.expandedPostingsLimit {
b.lazyPostings.postings = b.lazyPostings.postings[:b.expandedPostingsLimit]
if b.seriesLimit > 0 && len(b.lazyPostings.postings) > b.seriesLimit {
b.lazyPostings.postings = b.lazyPostings.postings[:b.seriesLimit]
}

// Apply series limiter eargerly if lazy postings not enabled.
Expand Down Expand Up @@ -1299,6 +1299,11 @@ OUTER:
}

seriesMatched++
if b.seriesLimit > 0 && seriesMatched > b.seriesLimit {
// Exit early if seriesLimit is set.
b.hasMorePostings = false
break
}
s := seriesEntry{lset: completeLabelset}
if b.skipChunks {
b.entries = append(b.entries, s)
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3957,7 +3957,7 @@ func TestBucketStoreMetadataLimit(t *testing.T) {
testutil.Ok(tb, err)
defer func() { testutil.Ok(tb, bkt.Close()) }()

uploadTestBlock(tb, tmpDir, bkt, 100)
uploadTestBlock(tb, tmpDir, bkt, 30000)

instrBkt := objstore.WithNoopInstr(bkt)
logger := log.NewNopLogger()
Expand Down Expand Up @@ -3995,11 +3995,11 @@ func TestBucketStoreMetadataLimit(t *testing.T) {
expectedResults int
}{
"series without limit": {
expectedResults: 40,
expectedResults: 12000,
},
"series with limit": {
limit: 2,
expectedResults: 2,
limit: 11000,
expectedResults: 11000,
},
}

Expand Down

0 comments on commit 44b5b9c

Please sign in to comment.