Skip to content

Commit 241fa91

Browse files
authored
Implementing multi level index cache (#5451)
1 parent 2c37922 commit 241fa91

File tree

16 files changed

+534
-78
lines changed

16 files changed

+534
-78
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
* [ENHANCEMENT] Store Gateway: Add config `estimated_max_series_size_bytes` and `estimated_max_chunk_size_bytes` to address data overfetch. #5401
4040
* [ENHANCEMENT] Distributor/Ingester: Add experimental `-distributor.sign_write_requests` flag to sign the write requests. #5430
4141
* [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442 #5446
42+
* [ENHANCEMENT] Store Gateway: Implementing multi level index cache. #5451
4243
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
4344
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
4445
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293

docs/blocks-storage/querier.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -519,8 +519,9 @@ blocks_storage:
519519
[consistency_delay: <duration> | default = 0s]
520520
521521
index_cache:
522-
# The index cache backend type. Supported values: inmemory, memcached,
523-
# redis.
522+
# The index cache backend type. Multiple cache backend can be provided as
523+
# a comma-separated ordered list to enable the implementation of a cache
524+
# hierarchy. Supported values: inmemory, memcached, redis.
524525
# CLI flag: -blocks-storage.bucket-store.index-cache.backend
525526
[backend: <string> | default = "inmemory"]
526527

docs/blocks-storage/store-gateway.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -606,8 +606,9 @@ blocks_storage:
606606
[consistency_delay: <duration> | default = 0s]
607607
608608
index_cache:
609-
# The index cache backend type. Supported values: inmemory, memcached,
610-
# redis.
609+
# The index cache backend type. Multiple cache backend can be provided as
610+
# a comma-separated ordered list to enable the implementation of a cache
611+
# hierarchy. Supported values: inmemory, memcached, redis.
611612
# CLI flag: -blocks-storage.bucket-store.index-cache.backend
612613
[backend: <string> | default = "inmemory"]
613614

docs/configuration/config-file-reference.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,8 +1045,9 @@ bucket_store:
10451045
[consistency_delay: <duration> | default = 0s]
10461046
10471047
index_cache:
1048-
# The index cache backend type. Supported values: inmemory, memcached,
1049-
# redis.
1048+
# The index cache backend type. Multiple cache backend can be provided as a
1049+
# comma-separated ordered list to enable the implementation of a cache
1050+
# hierarchy. Supported values: inmemory, memcached, redis.
10501051
# CLI flag: -blocks-storage.bucket-store.index-cache.backend
10511052
[backend: <string> | default = "inmemory"]
10521053

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ require (
5151
github.com/sony/gobreaker v0.5.0
5252
github.com/spf13/afero v1.9.5
5353
github.com/stretchr/testify v1.8.4
54-
github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a
54+
github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca
5555
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea
56-
github.com/thanos-io/thanos v0.31.1-0.20230711160112-df3a5f808726
56+
github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054
5757
github.com/uber/jaeger-client-go v2.30.0+incompatible
5858
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d
5959
go.etcd.io/etcd/api/v3 v3.5.8

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,12 +1160,12 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
11601160
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
11611161
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng=
11621162
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
1163-
github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a h1:tXcVeuval1nzdHn1JXqaBmyjuEUcpDI9huPrUF04nR4=
1164-
github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM=
1163+
github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca h1:JRF7i58HovirZQVJGwCClQsMK6CCmK2fvialXjeoSpI=
1164+
github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM=
11651165
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea h1:kzK8sBn2+mo3NAxP+XjAjAqr1hwfxxFUy5CybaBkjAI=
11661166
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea/go.mod h1:eIgPaXWgOhNAv6CPPrgu09r0AtT7byBTZy+7WkX0D18=
1167-
github.com/thanos-io/thanos v0.31.1-0.20230711160112-df3a5f808726 h1:DcjKUBKKMckA48Eua9H37+lOs13xDUx1PxixIs9hHHo=
1168-
github.com/thanos-io/thanos v0.31.1-0.20230711160112-df3a5f808726/go.mod h1:bDBl+vJEBXNkMvedh10vjDbvYkPyI6r2JJYJG0lLZTo=
1167+
github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054 h1:kBuXA0B+jXX89JAJTymw7g/v/4jyjCSgfPcWQeFUOoM=
1168+
github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054/go.mod h1:C0Cdk0kFFEDS3qkTgScF9ONSjrPxqnScGPoIgah3NJY=
11691169
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
11701170
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
11711171
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=

pkg/storage/tsdb/index_cache.go

Lines changed: 63 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ var (
3636
supportedIndexCacheBackends = []string{IndexCacheBackendInMemory, IndexCacheBackendMemcached, IndexCacheBackendRedis}
3737

3838
errUnsupportedIndexCacheBackend = errors.New("unsupported index cache backend")
39+
errDuplicatedIndexCacheBackend = errors.New("duplicated index cache backend")
3940
errNoIndexCacheAddresses = errors.New("no index cache backend addresses")
4041
)
4142

@@ -51,7 +52,10 @@ func (cfg *IndexCacheConfig) RegisterFlags(f *flag.FlagSet) {
5152
}
5253

5354
func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
54-
f.StringVar(&cfg.Backend, prefix+"backend", IndexCacheBackendDefault, fmt.Sprintf("The index cache backend type. Supported values: %s.", strings.Join(supportedIndexCacheBackends, ", ")))
55+
f.StringVar(&cfg.Backend, prefix+"backend", IndexCacheBackendDefault, fmt.Sprintf("The index cache backend type. "+
56+
"Multiple cache backend can be provided as a comma-separated ordered list to enable the implementation of a cache hierarchy. "+
57+
"Supported values: %s.",
58+
strings.Join(supportedIndexCacheBackends, ", ")))
5559

5660
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.")
5761
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
@@ -60,18 +64,30 @@ func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix str
6064

6165
// Validate the config.
6266
func (cfg *IndexCacheConfig) Validate() error {
63-
if !util.StringsContain(supportedIndexCacheBackends, cfg.Backend) {
64-
return errUnsupportedIndexCacheBackend
65-
}
6667

67-
if cfg.Backend == IndexCacheBackendMemcached {
68-
if err := cfg.Memcached.Validate(); err != nil {
69-
return err
68+
splitBackends := strings.Split(cfg.Backend, ",")
69+
configuredBackends := map[string]struct{}{}
70+
71+
for _, backend := range splitBackends {
72+
if !util.StringsContain(supportedIndexCacheBackends, backend) {
73+
return errUnsupportedIndexCacheBackend
74+
}
75+
76+
if _, ok := configuredBackends[backend]; ok {
77+
return errors.WithMessagef(errDuplicatedIndexCacheBackend, "duplicated backend: %v", backend)
7078
}
71-
} else if cfg.Backend == IndexCacheBackendRedis {
72-
if err := cfg.Redis.Validate(); err != nil {
73-
return err
79+
80+
if backend == IndexCacheBackendMemcached {
81+
if err := cfg.Memcached.Validate(); err != nil {
82+
return err
83+
}
84+
} else if backend == IndexCacheBackendRedis {
85+
if err := cfg.Redis.Validate(); err != nil {
86+
return err
87+
}
7488
}
89+
90+
configuredBackends[backend] = struct{}{}
7591
}
7692

7793
return nil
@@ -87,16 +103,42 @@ func (cfg *InMemoryIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, pr
87103

88104
// NewIndexCache creates a new index cache based on the input configuration.
89105
func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
90-
switch cfg.Backend {
91-
case IndexCacheBackendInMemory:
92-
return newInMemoryIndexCache(cfg.InMemory, logger, registerer)
93-
case IndexCacheBackendMemcached:
94-
return newMemcachedIndexCache(cfg.Memcached, logger, registerer)
95-
case IndexCacheBackendRedis:
96-
return newRedisIndexCache(cfg.Redis, logger, registerer)
97-
default:
98-
return nil, errUnsupportedIndexCacheBackend
106+
splitBackends := strings.Split(cfg.Backend, ",")
107+
var caches []storecache.IndexCache
108+
109+
for i, backend := range splitBackends {
110+
iReg := registerer
111+
112+
// Create the level label if we have more than one cache
113+
if len(splitBackends) > 1 {
114+
iReg = prometheus.WrapRegistererWith(prometheus.Labels{"level": fmt.Sprintf("L%v", i)}, registerer)
115+
}
116+
117+
switch backend {
118+
case IndexCacheBackendInMemory:
119+
c, err := newInMemoryIndexCache(cfg.InMemory, logger, iReg)
120+
if err != nil {
121+
return c, err
122+
}
123+
caches = append(caches, c)
124+
case IndexCacheBackendMemcached:
125+
c, err := newMemcachedIndexCache(cfg.Memcached, logger, iReg)
126+
if err != nil {
127+
return c, err
128+
}
129+
caches = append(caches, c)
130+
case IndexCacheBackendRedis:
131+
c, err := newRedisIndexCache(cfg.Redis, logger, iReg)
132+
if err != nil {
133+
return c, err
134+
}
135+
caches = append(caches, c)
136+
default:
137+
return nil, errUnsupportedIndexCacheBackend
138+
}
99139
}
140+
141+
return newMultiLevelCache(caches...), nil
100142
}
101143

102144
func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
@@ -108,7 +150,7 @@ func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, regi
108150
maxItemSize = maxCacheSize
109151
}
110152

111-
return storecache.NewInMemoryIndexCacheWithConfig(logger, registerer, storecache.InMemoryIndexCacheConfig{
153+
return storecache.NewInMemoryIndexCacheWithConfig(logger, nil, registerer, storecache.InMemoryIndexCacheConfig{
112154
MaxSize: maxCacheSize,
113155
MaxItemSize: maxItemSize,
114156
})
@@ -129,5 +171,5 @@ func newRedisIndexCache(cfg RedisClientConfig, logger log.Logger, registerer pro
129171
return nil, errors.Wrapf(err, "create index cache redis client")
130172
}
131173

132-
return storecache.NewRemoteIndexCache(logger, client, registerer)
174+
return storecache.NewRemoteIndexCache(logger, client, nil, registerer)
133175
}

pkg/storage/tsdb/multilevel_cache.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package tsdb
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/oklog/ulid"
8+
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/prometheus/prometheus/storage"
10+
storecache "github.com/thanos-io/thanos/pkg/store/cache"
11+
)
12+
13+
type multiLevelCache struct {
14+
caches []storecache.IndexCache
15+
}
16+
17+
func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
18+
wg := sync.WaitGroup{}
19+
wg.Add(len(m.caches))
20+
for _, c := range m.caches {
21+
cache := c
22+
go func() {
23+
defer wg.Done()
24+
cache.StorePostings(blockID, l, v)
25+
}()
26+
}
27+
wg.Wait()
28+
}
29+
30+
func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) {
31+
misses = keys
32+
hits = map[labels.Label][]byte{}
33+
for _, c := range m.caches {
34+
h, m := c.FetchMultiPostings(ctx, blockID, misses)
35+
misses = m
36+
37+
for label, bytes := range h {
38+
hits[label] = bytes
39+
}
40+
if len(misses) == 0 {
41+
break
42+
}
43+
}
44+
45+
return hits, misses
46+
}
47+
48+
func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) {
49+
wg := sync.WaitGroup{}
50+
wg.Add(len(m.caches))
51+
for _, c := range m.caches {
52+
cache := c
53+
go func() {
54+
defer wg.Done()
55+
cache.StoreExpandedPostings(blockID, matchers, v)
56+
}()
57+
}
58+
wg.Wait()
59+
}
60+
61+
func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) {
62+
for _, c := range m.caches {
63+
if d, h := c.FetchExpandedPostings(ctx, blockID, matchers); h {
64+
return d, h
65+
}
66+
}
67+
68+
return []byte{}, false
69+
}
70+
71+
func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
72+
wg := sync.WaitGroup{}
73+
wg.Add(len(m.caches))
74+
for _, c := range m.caches {
75+
cache := c
76+
go func() {
77+
defer wg.Done()
78+
cache.StoreSeries(blockID, id, v)
79+
}()
80+
}
81+
wg.Wait()
82+
}
83+
84+
func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
85+
misses = ids
86+
hits = map[storage.SeriesRef][]byte{}
87+
for _, c := range m.caches {
88+
h, m := c.FetchMultiSeries(ctx, blockID, misses)
89+
misses = m
90+
91+
for label, bytes := range h {
92+
hits[label] = bytes
93+
}
94+
if len(misses) == 0 {
95+
break
96+
}
97+
}
98+
99+
return hits, misses
100+
}
101+
102+
func newMultiLevelCache(c ...storecache.IndexCache) storecache.IndexCache {
103+
if len(c) == 1 {
104+
return c[0]
105+
}
106+
return &multiLevelCache{
107+
caches: c,
108+
}
109+
}

0 commit comments

Comments
 (0)