Skip to content

Shared in-memory index cache for queriers with blocks storage #2189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088
* [CHANGE] Experimental TSDB: TSDB head compaction interval and concurrency is now configurable (defaults to 1 min interval and 5 concurrent head compactions). New options: `-experimental.tsdb.head-compaction-interval` and `-experimental.tsdb.head-compaction-concurrency`. #2172
* [CHANGE] Remove fluentd-based billing infrastructure and flags such as `-distributor.enable-billing`. #1491
* [CHANGE] Experimental TSDB: the querier in-memory index cache used by the experimental blocks storage shifted from per-tenant to per-querier. The `-experimental.tsdb.bucket-store.index-cache-size-bytes` now configures the per-querier index cache max size instead of a per-tenant cache and its default has been increased to 1GB. #2189
* [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
* `--experimental.distributor.user-subring-size`
Expand Down
6 changes: 3 additions & 3 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2139,10 +2139,10 @@ bucket_store:
# CLI flag: -experimental.tsdb.bucket-store.sync-interval
[sync_interval: <duration> | default = 5m0s]

# Size - in bytes - of a per-tenant in-memory index cache used to speed up
# blocks index lookups.
# Size in bytes of in-memory index cache used to speed up blocks index lookups
# (shared between all tenants).
# CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes
[index_cache_size_bytes: <int> | default = 262144000]
[index_cache_size_bytes: <int> | default = 1073741824]

# Max size - in bytes - of a per-tenant chunk pool, used to reduce memory
# allocations.
Expand Down
6 changes: 3 additions & 3 deletions docs/operations/blocks-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ tsdb:
# CLI flag: -experimental.tsdb.bucket-store.sync-interval
[sync_interval: <duration> | default = 5m0s]

# Size - in bytes - of a per-tenant in-memory index cache used to speed up
# blocks index lookups.
# Size in bytes of in-memory index cache used to speed up blocks index
# lookups (shared between all tenants).
# CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes
[index_cache_size_bytes: <int> | default = 262144000]
[index_cache_size_bytes: <int> | default = 1073741824]

# Max size - in bytes - of a per-tenant chunk pool, used to reduce memory
# allocations.
Expand Down
1 change: 1 addition & 0 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var (
"-experimental.tsdb.bucket-store.sync-interval": "5s",
"-experimental.tsdb.retention-period": "5m",
"-experimental.tsdb.ship-interval": "1m",
"-experimental.tsdb.head-compaction-interval": "1s",
"-experimental.tsdb.s3.access-key-id": e2edb.MinioAccessKey,
"-experimental.tsdb.s3.secret-access-key": e2edb.MinioSecretKey,
"-experimental.tsdb.s3.bucket-name": "cortex",
Expand Down
139 changes: 139 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// +build integration

package main

import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func TestQuerierWithBlocksStorage(t *testing.T) {
tests := map[string]struct {
flags map[string]string
}{
"querier running with ingester gRPC streaming disabled": {
flags: mergeFlags(BlocksStorageFlags, map[string]string{
"-querier.ingester-streaming": "false",
}),
},
}

for testName, testCfg := range tests {
t.Run(testName, func(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(testCfg.flags, map[string]string{
"-experimental.tsdb.block-ranges-period": blockRangePeriod.String(),
"-experimental.tsdb.ship-interval": "1s",
"-experimental.tsdb.bucket-store.sync-interval": "1s",
"-experimental.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
})

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-experimental.tsdb.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex components.
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))

// Wait until both the distributor and querier have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1")
require.NoError(t, err)

// Push some series to Cortex.
series1Timestamp := time.Now()
series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
series1, expectedVector1 := generateSeries("series_1", series1Timestamp)
series2, expectedVector2 := generateSeries("series_2", series2Timestamp)

res, err := c.Push(series1)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Wait until the TSDB head is compacted and shipped to the storage.
// The shipped block contains the 1st series, while the 2ns series in in the head.
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series"))

// Push another series to further compact another block and delete the first block
// due to expired retention.
series3Timestamp := series2Timestamp.Add(blockRangePeriod * 2)
series3, expectedVector3 := generateSeries("series_3", series3Timestamp)

res, err = c.Push(series3)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_shipper_uploads_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(3), "cortex_ingester_memory_series_created_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series"))

// Wait until the querier has synched the new uploaded blocks.
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_querier_bucket_store_blocks_loaded"))

// Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both).
// TODO: apparently Thanos has a bug which cause a block to not be considered if the
// query timetamp matches the block max timestamp
series1Timestamp = series1Timestamp.Add(time.Duration(time.Millisecond))
expectedVector1[0].Timestamp = model.Time(e2e.TimeToMilliseconds(series1Timestamp))

result, err := c.Query("series_1", series1Timestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector1, result.(model.Vector))

result, err = c.Query("series_2", series2Timestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector2, result.(model.Vector))

result, err = c.Query("series_3", series3Timestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector3, result.(model.Vector))

// Check the in-memory index cache metrics (in the querier).
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // 2 series both for postings and series cache
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // 2 series both for postings and series cache
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(0), "cortex_querier_blocks_index_cache_hits_total")) // no cache hit cause the cache was empty

// Query back again the 1st series from storage. This time it should use the index cache.
result, err = c.Query("series_1", series1Timestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector1, result.(model.Vector))

require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // as before
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // as before
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_querier_blocks_index_cache_hits_total")) // this time has used the index cache
})
}
}
60 changes: 32 additions & 28 deletions pkg/querier/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/model"
Expand All @@ -31,16 +32,20 @@ import (

// UserStore is a multi-tenant version of Thanos BucketStore
type UserStore struct {
logger log.Logger
cfg tsdb.Config
bucket objstore.Bucket
client storepb.StoreClient
logLevel logging.Level
tsdbMetrics *tsdbBucketStoreMetrics
logger log.Logger
cfg tsdb.Config
bucket objstore.Bucket
client storepb.StoreClient
logLevel logging.Level
bucketStoreMetrics *tsdbBucketStoreMetrics
indexCacheMetrics *tsdbIndexCacheMetrics

syncMint model.TimeOrDurationValue
syncMaxt model.TimeOrDurationValue

// Index cache shared across all tenants.
indexCache storecache.IndexCache

// Keeps a bucket store for each tenant.
storesMu sync.RWMutex
stores map[string]*store.BucketStore
Expand All @@ -55,16 +60,20 @@ type UserStore struct {

// NewUserStore returns a new UserStore
func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel logging.Level, logger log.Logger, registerer prometheus.Registerer) (*UserStore, error) {
var err error

workersCtx, workersCancel := context.WithCancel(context.Background())
indexCacheRegistry := prometheus.NewRegistry()

u := &UserStore{
logger: logger,
cfg: cfg,
bucket: bucketClient,
stores: map[string]*store.BucketStore{},
logLevel: logLevel,
tsdbMetrics: newTSDBBucketStoreMetrics(),
workersCancel: workersCancel,
logger: logger,
cfg: cfg,
bucket: bucketClient,
stores: map[string]*store.BucketStore{},
logLevel: logLevel,
bucketStoreMetrics: newTSDBBucketStoreMetrics(),
indexCacheMetrics: newTSDBIndexCacheMetrics(indexCacheRegistry),
workersCancel: workersCancel,
syncTimes: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "cortex_querier_blocks_sync_seconds",
Help: "The total time it takes to perform a sync stores",
Expand All @@ -73,15 +82,20 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin
}

// Configure the time range to sync all blocks.
if err := u.syncMint.Set("0000-01-01T00:00:00Z"); err != nil {
if err = u.syncMint.Set("0000-01-01T00:00:00Z"); err != nil {
return nil, err
}
if err := u.syncMaxt.Set("9999-12-31T23:59:59Z"); err != nil {
if err = u.syncMaxt.Set("9999-12-31T23:59:59Z"); err != nil {
return nil, err
}

// Init the index cache.
if u.indexCache, err = tsdb.NewIndexCache(cfg.BucketStore, logger, indexCacheRegistry); err != nil {
return nil, errors.Wrap(err, "create index cache")
}

if registerer != nil {
registerer.MustRegister(u.syncTimes, u.tsdbMetrics)
registerer.MustRegister(u.syncTimes, u.bucketStoreMetrics, u.indexCacheMetrics)
}

serv := grpc.NewServer()
Expand Down Expand Up @@ -357,16 +371,6 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error)
userBkt := tsdb.NewUserBucketClient(userID, u.bucket)

reg := prometheus.NewRegistry()
indexCacheSizeBytes := u.cfg.BucketStore.IndexCacheSizeBytes
maxItemSizeBytes := indexCacheSizeBytes / 2
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(userLogger, reg, storecache.InMemoryIndexCacheConfig{
MaxSize: storecache.Bytes(indexCacheSizeBytes),
MaxItemSize: storecache.Bytes(maxItemSizeBytes),
})
if err != nil {
return nil, err
}

fetcher, err := block.NewMetaFetcher(
userLogger,
u.cfg.BucketStore.MetaSyncConcurrency,
Expand All @@ -385,7 +389,7 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error)
userBkt,
fetcher,
filepath.Join(u.cfg.BucketStore.SyncDir, userID),
indexCache,
u.indexCache,
uint64(u.cfg.BucketStore.MaxChunkPoolBytes),
u.cfg.BucketStore.MaxSampleCount,
u.cfg.BucketStore.MaxConcurrent,
Expand All @@ -402,7 +406,7 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error)
}

u.stores[userID] = bs
u.tsdbMetrics.addUserRegistry(userID, reg)
u.bucketStoreMetrics.addUserRegistry(userID, reg)

return bs, nil
}
Loading