Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring: allow to pass BytesPool to store.NewBucketStore() #3801

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ func runStore(

queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), maxConcurrency)

chunkPool, err := store.NewDefaultChunkBytesPool(chunkPoolSizeBytes)
if err != nil {
return errors.Wrap(err, "create chunk pool")
}

bs, err := store.NewBucketStore(
logger,
reg,
Expand All @@ -308,7 +313,7 @@ func runStore(
dataDir,
indexCache,
queriesGate,
chunkPoolSizeBytes,
chunkPool,
store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
store.NewSeriesLimiterFactory(maxSeriesCount),
verbose,
Expand Down
2 changes: 1 addition & 1 deletion pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewBucketedBytesPool(minSize, maxSize int, factor float64, maxTotal uint64)
// ErrPoolExhausted is returned if a pool cannot provide the request bytes.
var ErrPoolExhausted = errors.New("pool exhausted")

// Get returns a new byte slices that fits the given size.
// Get returns a new byte slice that fits the given size.
func (p *BucketedBytesPool) Get(sz int) (*[]byte, error) {
p.mtx.Lock()
defer p.mtx.Unlock()
Expand Down
12 changes: 6 additions & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func NewBucketStore(
dir string,
indexCache storecache.IndexCache,
queryGate gate.Gate,
maxChunkPoolBytes uint64,
chunkPool pool.BytesPool,
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
debugLogging bool,
Expand All @@ -316,11 +316,6 @@ func NewBucketStore(
logger = log.NewNopLogger()
}

chunkPool, err := pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes)
if err != nil {
return nil, errors.Wrap(err, "create chunk pool")
}

s := &BucketStore{
logger: logger,
bkt: bkt,
Expand Down Expand Up @@ -2449,3 +2444,8 @@ func (s queryStats) merge(o *queryStats) *queryStats {

return &s
}

// NewDefaultChunkBytesPool returns a chunk bytes pool with default settings.
func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.BytesPool, error) {
return pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes)
}
5 changes: 4 additions & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
}, nil)
testutil.Ok(t, err)

chunkPool, err := NewDefaultChunkBytesPool(0)
testutil.Ok(t, err)

store, err := NewBucketStore(
s.logger,
nil,
Expand All @@ -162,7 +165,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
dir,
s.cache,
nil,
0,
chunkPool,
NewChunksLimiterFactory(maxChunksLimit),
NewSeriesLimiterFactory(0),
false,
Expand Down
30 changes: 24 additions & 6 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ func TestBucketStore_Info(t *testing.T) {

defer testutil.Ok(t, os.RemoveAll(dir))

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(
nil,
nil,
Expand All @@ -573,7 +576,7 @@ func TestBucketStore_Info(t *testing.T) {
dir,
noopCache{},
nil,
2e5,
chunkPool,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
false,
Expand Down Expand Up @@ -817,6 +820,9 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
}, nil)
testutil.Ok(t, err)

chunkPool, err := NewDefaultChunkBytesPool(0)
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(
logger,
nil,
Expand All @@ -825,7 +831,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
dir,
noopCache{},
nil,
0,
chunkPool,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
false,
Expand Down Expand Up @@ -1636,6 +1642,9 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

chunkPool, err := NewDefaultChunkBytesPool(1000000)
testutil.Ok(t, err)

store, err := NewBucketStore(
logger,
nil,
Expand All @@ -1644,7 +1653,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
tmpDir,
indexCache,
nil,
1000000,
chunkPool,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
false,
Expand Down Expand Up @@ -1730,6 +1739,9 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

chunkPool, err := NewDefaultChunkBytesPool(1000000)
testutil.Ok(t, err)

store, err := NewBucketStore(
logger,
nil,
Expand All @@ -1738,7 +1750,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
tmpDir,
indexCache,
nil,
1000000,
chunkPool,
NewChunksLimiterFactory(100000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
false,
Expand Down Expand Up @@ -1875,6 +1887,9 @@ func TestBlockWithLargeChunks(t *testing.T) {
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(t, err)

chunkPool, err := NewDefaultChunkBytesPool(1000000)
testutil.Ok(t, err)

store, err := NewBucketStore(
logger,
nil,
Expand All @@ -1883,7 +1898,7 @@ func TestBlockWithLargeChunks(t *testing.T) {
tmpDir,
indexCache,
nil,
1000000,
chunkPool,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
false,
Expand Down Expand Up @@ -2036,6 +2051,9 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

chunkPool, err := NewDefaultChunkBytesPool(1000000)
testutil.Ok(t, err)

store, err := NewBucketStore(
logger,
nil,
Expand All @@ -2044,7 +2062,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb
tmpDir,
indexCache,
nil,
1000000,
chunkPool,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
false,
Expand Down