Skip to content

Commit c74eda1

Browse files
committed
Implementing multi level index cache
Signed-off-by: Alan Protasio <alanprot@gmail.com>
1 parent 7ef74ea commit c74eda1

File tree

3 files changed

+413
-18
lines changed

3 files changed

+413
-18
lines changed

pkg/storage/tsdb/index_cache.go

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,22 @@ func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix str
6060

6161
// Validate the config.
6262
func (cfg *IndexCacheConfig) Validate() error {
63-
if !util.StringsContain(supportedIndexCacheBackends, cfg.Backend) {
64-
return errUnsupportedIndexCacheBackend
65-
}
6663

67-
if cfg.Backend == IndexCacheBackendMemcached {
68-
if err := cfg.Memcached.Validate(); err != nil {
69-
return err
64+
splitedBackends := strings.Split(cfg.Backend, ",")
65+
66+
for _, backend := range splitedBackends {
67+
if !util.StringsContain(supportedIndexCacheBackends, backend) {
68+
return errUnsupportedIndexCacheBackend
7069
}
71-
} else if cfg.Backend == IndexCacheBackendRedis {
72-
if err := cfg.Redis.Validate(); err != nil {
73-
return err
70+
71+
if backend == IndexCacheBackendMemcached {
72+
if err := cfg.Memcached.Validate(); err != nil {
73+
return err
74+
}
75+
} else if backend == IndexCacheBackendRedis {
76+
if err := cfg.Redis.Validate(); err != nil {
77+
return err
78+
}
7479
}
7580
}
7681

@@ -87,16 +92,42 @@ func (cfg *InMemoryIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, pr
8792

8893
// NewIndexCache creates a new index cache based on the input configuration.
8994
func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
90-
switch cfg.Backend {
91-
case IndexCacheBackendInMemory:
92-
return newInMemoryIndexCache(cfg.InMemory, logger, registerer)
93-
case IndexCacheBackendMemcached:
94-
return newMemcachedIndexCache(cfg.Memcached, logger, registerer)
95-
case IndexCacheBackendRedis:
96-
return newRedisIndexCache(cfg.Redis, logger, registerer)
97-
default:
98-
return nil, errUnsupportedIndexCacheBackend
95+
splitBackends := strings.Split(cfg.Backend, ",")
96+
var caches []storecache.IndexCache
97+
98+
for i, backend := range splitBackends {
99+
iReg := registerer
100+
101+
// Create the level label if we have more than one cache
102+
if len(splitBackends) > 1 {
103+
iReg = prometheus.WrapRegistererWith(prometheus.Labels{"level": fmt.Sprintf("L%v", i)}, registerer)
104+
}
105+
106+
switch backend {
107+
case IndexCacheBackendInMemory:
108+
c, err := newInMemoryIndexCache(cfg.InMemory, logger, iReg)
109+
if err != nil {
110+
return c, err
111+
}
112+
caches = append(caches, c)
113+
case IndexCacheBackendMemcached:
114+
c, err := newMemcachedIndexCache(cfg.Memcached, logger, iReg)
115+
if err != nil {
116+
return c, err
117+
}
118+
caches = append(caches, c)
119+
case IndexCacheBackendRedis:
120+
c, err := newRedisIndexCache(cfg.Redis, logger, iReg)
121+
if err != nil {
122+
return c, err
123+
}
124+
caches = append(caches, c)
125+
default:
126+
return nil, errUnsupportedIndexCacheBackend
127+
}
99128
}
129+
130+
return newMultiLevelCache(caches...), nil
100131
}
101132

102133
func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {

pkg/storage/tsdb/multilevel_cache.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package tsdb
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/oklog/ulid"
8+
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/prometheus/prometheus/storage"
10+
storecache "github.com/thanos-io/thanos/pkg/store/cache"
11+
)
12+
13+
type multiLevelCache struct {
14+
caches []storecache.IndexCache
15+
}
16+
17+
func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
18+
wg := sync.WaitGroup{}
19+
wg.Add(len(m.caches))
20+
for _, c := range m.caches {
21+
cache := c
22+
go func() {
23+
defer wg.Done()
24+
cache.StorePostings(blockID, l, v)
25+
}()
26+
}
27+
wg.Wait()
28+
}
29+
30+
func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) {
31+
misses = keys
32+
hits = map[labels.Label][]byte{}
33+
for _, c := range m.caches {
34+
h, m := c.FetchMultiPostings(ctx, blockID, misses)
35+
misses = m
36+
37+
for label, bytes := range h {
38+
hits[label] = bytes
39+
}
40+
if len(misses) == 0 {
41+
break
42+
}
43+
}
44+
45+
return hits, misses
46+
}
47+
48+
func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) {
49+
wg := sync.WaitGroup{}
50+
wg.Add(len(m.caches))
51+
for _, c := range m.caches {
52+
cache := c
53+
go func() {
54+
defer wg.Done()
55+
cache.StoreExpandedPostings(blockID, matchers, v)
56+
}()
57+
}
58+
wg.Wait()
59+
}
60+
61+
func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) {
62+
for _, c := range m.caches {
63+
if d, h := c.FetchExpandedPostings(ctx, blockID, matchers); h {
64+
return d, h
65+
}
66+
}
67+
68+
return []byte{}, false
69+
}
70+
71+
func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
72+
wg := sync.WaitGroup{}
73+
wg.Add(len(m.caches))
74+
for _, c := range m.caches {
75+
cache := c
76+
go func() {
77+
defer wg.Done()
78+
cache.StoreSeries(blockID, id, v)
79+
}()
80+
}
81+
wg.Wait()
82+
}
83+
84+
func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
85+
misses = ids
86+
hits = map[storage.SeriesRef][]byte{}
87+
for _, c := range m.caches {
88+
h, m := c.FetchMultiSeries(ctx, blockID, misses)
89+
misses = m
90+
91+
for label, bytes := range h {
92+
hits[label] = bytes
93+
}
94+
if len(misses) == 0 {
95+
break
96+
}
97+
}
98+
99+
return hits, misses
100+
}
101+
102+
func newMultiLevelCache(c ...storecache.IndexCache) storecache.IndexCache {
103+
if len(c) == 0 {
104+
return c[0]
105+
}
106+
return &multiLevelCache{
107+
caches: c,
108+
}
109+
}

0 commit comments

Comments
 (0)