Skip to content

Commit 9dc9eda

Browse files
authored
Replace inmemory index cache to fastcache based implementation (#5619)
* replace inmemory index cache to fastcache based implementation Signed-off-by: Ben Ye <benye@amazon.com> * changelog Signed-off-by: Ben Ye <benye@amazon.com> * add benchmarks Signed-off-by: Ben Ye <benye@amazon.com> * fix conflicts Signed-off-by: Ben Ye <benye@amazon.com> --------- Signed-off-by: Ben Ye <benye@amazon.com>
1 parent bc6be25 commit 9dc9eda

File tree

16 files changed

+1869
-18
lines changed

16 files changed

+1869
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* [CHANGE] Query Frontend/Querier: Make build info API disabled by default and add feature flag `api.build-info-enabled` to enable it. #5533
3636
* [CHANGE] Purger: Do no use S3 tenant kms key when uploading deletion marker. #5575
3737
* [CHANGE] Ingester: Shipper always upload compacted blocks. #5625
38+
* [CHANGE] Store Gateway: Add a new fastcache based inmemory index cache. #5619
3839
* [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request.
3940
* [FEATURE] Added 2 flags `-alertmanager.alertmanager-client.grpc-max-send-msg-size` and ` -alertmanager.alertmanager-client.grpc-max-recv-msg-size` to configure alert manager grpc client message size limits. #5338
4041
* [FEATURE] Query Frontend: Add `cortex_rejected_queries_total` metric for throttled queries. #5356

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ require (
7777
)
7878

7979
require (
80+
github.com/VictoriaMetrics/fastcache v1.12.1
8081
github.com/cespare/xxhash/v2 v2.2.0
8182
github.com/google/go-cmp v0.5.9
8283
google.golang.org/protobuf v1.31.0

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,8 @@ github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5
646646
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
647647
github.com/OneOfOne/xxhash v1.2.6 h1:U68crOE3y3MPttCMQGywZOLrTeF5HHJ3/vDBCJn9/bA=
648648
github.com/OneOfOne/xxhash v1.2.6/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
649+
github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bwt3uRKnkZU40=
650+
github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o=
649651
github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY=
650652
github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk=
651653
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
@@ -667,6 +669,8 @@ github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOS
667669
github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg=
668670
github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible h1:9gWa46nstkJ9miBReJcN8Gq34cBFbzSpQZVVT9N09TM=
669671
github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
672+
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
673+
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
670674
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
671675
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
672676
github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0=

integration/querier_test.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -257,10 +257,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
257257
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64((5+5+2)*numberOfCacheBackends)), "thanos_store_index_cache_requests_total"))
258258
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty
259259

260-
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
261-
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items"))
262-
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total"))
263-
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
260+
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
264261
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(21), "thanos_memcached_operations_total")) // 14 gets + 7 sets
265262
}
266263

@@ -297,10 +294,6 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
297294
}
298295
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache
299296

300-
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
301-
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items")) // as before
302-
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total")) // as before
303-
}
304297
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
305298
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(23-l0CacheHits), "thanos_memcached_operations_total")) // as before + 2 gets - cache hits
306299
}
@@ -516,10 +509,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
516509
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((5+5+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total")) // 5 for expanded postings and postings, 2 for series
517510
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty
518511

519-
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
520-
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items"))
521-
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total"))
522-
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
512+
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
523513
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(21*seriesReplicationFactor)), "thanos_memcached_operations_total")) // 14 gets + 7 sets
524514
}
525515

@@ -532,10 +522,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
532522
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((12+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total"))
533523
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*seriesReplicationFactor)), "thanos_store_index_cache_hits_total")) // this time has used the index cache
534524

535-
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
536-
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items")) // as before
537-
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) // as before
538-
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
525+
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
539526
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((21+2)*seriesReplicationFactor)), "thanos_memcached_operations_total")) // as before + 2 gets
540527
}
541528

pkg/storage/tsdb/index_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, regi
222222
maxItemSize = maxCacheSize
223223
}
224224

225-
return storecache.NewInMemoryIndexCacheWithConfig(logger, nil, registerer, storecache.InMemoryIndexCacheConfig{
225+
return NewInMemoryIndexCacheWithConfig(logger, nil, registerer, storecache.InMemoryIndexCacheConfig{
226226
MaxSize: maxCacheSize,
227227
MaxItemSize: maxItemSize,
228228
})
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
package tsdb
2+
3+
import (
4+
"context"
5+
"reflect"
6+
"unsafe"
7+
8+
"github.com/VictoriaMetrics/fastcache"
9+
"github.com/go-kit/log"
10+
"github.com/go-kit/log/level"
11+
"github.com/oklog/ulid"
12+
"github.com/pkg/errors"
13+
"github.com/prometheus/client_golang/prometheus"
14+
"github.com/prometheus/client_golang/prometheus/promauto"
15+
"github.com/prometheus/prometheus/model/labels"
16+
"github.com/prometheus/prometheus/storage"
17+
18+
storecache "github.com/thanos-io/thanos/pkg/store/cache"
19+
"github.com/thanos-io/thanos/pkg/tenancy"
20+
)
21+
22+
type InMemoryIndexCache struct {
23+
logger log.Logger
24+
cache *fastcache.Cache
25+
maxItemSizeBytes uint64
26+
27+
added *prometheus.CounterVec
28+
overflow *prometheus.CounterVec
29+
30+
commonMetrics *storecache.CommonMetrics
31+
}
32+
33+
// NewInMemoryIndexCacheWithConfig creates a new thread-safe cache for index entries. It relies on the cache library
34+
// (fastcache) to ensures the total cache size approximately does not exceed maxBytes.
35+
func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *storecache.CommonMetrics, reg prometheus.Registerer, config storecache.InMemoryIndexCacheConfig) (*InMemoryIndexCache, error) {
36+
if config.MaxItemSize > config.MaxSize {
37+
return nil, errors.Errorf("max item size (%v) cannot be bigger than overall cache size (%v)", config.MaxItemSize, config.MaxSize)
38+
}
39+
40+
// fastcache will panic if MaxSize <= 0.
41+
if config.MaxSize <= 0 {
42+
config.MaxSize = storecache.DefaultInMemoryIndexCacheConfig.MaxSize
43+
}
44+
45+
if commonMetrics == nil {
46+
commonMetrics = storecache.NewCommonMetrics(reg)
47+
}
48+
49+
c := &InMemoryIndexCache{
50+
logger: logger,
51+
maxItemSizeBytes: uint64(config.MaxItemSize),
52+
commonMetrics: commonMetrics,
53+
}
54+
55+
c.added = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
56+
Name: "thanos_store_index_cache_items_added_total",
57+
Help: "Total number of items that were added to the index cache.",
58+
}, []string{"item_type"})
59+
c.added.WithLabelValues(cacheTypePostings)
60+
c.added.WithLabelValues(cacheTypeSeries)
61+
c.added.WithLabelValues(cacheTypeExpandedPostings)
62+
63+
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant)
64+
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant)
65+
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant)
66+
67+
c.overflow = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
68+
Name: "thanos_store_index_cache_items_overflowed_total",
69+
Help: "Total number of items that could not be added to the cache due to being too big.",
70+
}, []string{"item_type"})
71+
c.overflow.WithLabelValues(cacheTypePostings)
72+
c.overflow.WithLabelValues(cacheTypeSeries)
73+
c.overflow.WithLabelValues(cacheTypeExpandedPostings)
74+
75+
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant)
76+
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant)
77+
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant)
78+
79+
c.cache = fastcache.New(int(config.MaxSize))
80+
level.Info(logger).Log(
81+
"msg", "created in-memory index cache",
82+
"maxItemSizeBytes", c.maxItemSizeBytes,
83+
"maxSizeBytes", config.MaxSize,
84+
)
85+
return c, nil
86+
}
87+
88+
func (c *InMemoryIndexCache) get(key storecache.CacheKey) ([]byte, bool) {
89+
k := yoloBuf(key.String())
90+
resp := c.cache.GetBig(nil, k)
91+
if len(resp) == 0 {
92+
return nil, false
93+
}
94+
return resp, true
95+
}
96+
97+
func (c *InMemoryIndexCache) set(typ string, key storecache.CacheKey, val []byte) {
98+
k := yoloBuf(key.String())
99+
r := c.cache.GetBig(nil, k)
100+
// item exists, no need to set it again.
101+
if r != nil {
102+
return
103+
}
104+
105+
size := uint64(len(k) + len(val))
106+
if size > c.maxItemSizeBytes {
107+
level.Info(c.logger).Log(
108+
"msg", "item bigger than maxItemSizeBytes. Ignoring..",
109+
"maxItemSizeBytes", c.maxItemSizeBytes,
110+
"cacheType", typ,
111+
)
112+
c.overflow.WithLabelValues(typ).Inc()
113+
return
114+
}
115+
116+
c.cache.SetBig(k, val)
117+
c.added.WithLabelValues(typ).Inc()
118+
}
119+
120+
func yoloBuf(s string) []byte {
121+
return *(*[]byte)(unsafe.Pointer(&s))
122+
}
123+
124+
func copyString(s string) string {
125+
var b []byte
126+
h := (*reflect.SliceHeader)(unsafe.Pointer(&b))
127+
h.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data
128+
h.Len = len(s)
129+
h.Cap = len(s)
130+
return string(b)
131+
}
132+
133+
// copyToKey is required as underlying strings might be mmaped.
134+
func copyToKey(l labels.Label) storecache.CacheKeyPostings {
135+
return storecache.CacheKeyPostings(labels.Label{Value: copyString(l.Value), Name: copyString(l.Name)})
136+
}
137+
138+
// StorePostings sets the postings identified by the ulid and label to the value v,
139+
// if the postings already exists in the cache it is not mutated.
140+
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
141+
c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v)))
142+
c.set(cacheTypePostings, storecache.CacheKey{Block: blockID.String(), Key: copyToKey(l)}, v)
143+
}
144+
145+
// FetchMultiPostings fetches multiple postings - each identified by a label -
146+
// and returns a map containing cache hits, along with a list of missing keys.
147+
func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
148+
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypePostings, tenant))
149+
defer timer.ObserveDuration()
150+
151+
hits = map[labels.Label][]byte{}
152+
153+
blockIDKey := blockID.String()
154+
requests := 0
155+
hit := 0
156+
for _, key := range keys {
157+
if ctx.Err() != nil {
158+
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(requests))
159+
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(hit))
160+
return hits, misses
161+
}
162+
requests++
163+
if b, ok := c.get(storecache.CacheKey{Block: blockIDKey, Key: storecache.CacheKeyPostings(key)}); ok {
164+
hit++
165+
hits[key] = b
166+
continue
167+
}
168+
169+
misses = append(misses, key)
170+
}
171+
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(requests))
172+
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(hit))
173+
174+
return hits, misses
175+
}
176+
177+
// StoreExpandedPostings stores expanded postings for a set of label matchers.
178+
func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
179+
c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v)))
180+
c.set(cacheTypeExpandedPostings, storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeyExpandedPostings(storecache.LabelMatchersToString(matchers))}, v)
181+
}
182+
183+
// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
184+
func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
185+
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenant))
186+
defer timer.ObserveDuration()
187+
188+
if ctx.Err() != nil {
189+
return nil, false
190+
}
191+
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Inc()
192+
if b, ok := c.get(storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeyExpandedPostings(storecache.LabelMatchersToString(matchers))}); ok {
193+
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Inc()
194+
return b, true
195+
}
196+
return nil, false
197+
}
198+
199+
// StoreSeries sets the series identified by the ulid and id to the value v,
200+
// if the series already exists in the cache it is not mutated.
201+
func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
202+
c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v)))
203+
c.set(cacheTypeSeries, storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeySeries(id)}, v)
204+
}
205+
206+
// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
207+
// and returns a map containing cache hits, along with a list of missing IDs.
208+
func (c *InMemoryIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
209+
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypeSeries, tenant))
210+
defer timer.ObserveDuration()
211+
212+
hits = map[storage.SeriesRef][]byte{}
213+
214+
blockIDKey := blockID.String()
215+
requests := 0
216+
hit := 0
217+
for _, id := range ids {
218+
if ctx.Err() != nil {
219+
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(requests))
220+
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(hit))
221+
return hits, misses
222+
}
223+
requests++
224+
if b, ok := c.get(storecache.CacheKey{Block: blockIDKey, Key: storecache.CacheKeySeries(id)}); ok {
225+
hit++
226+
hits[id] = b
227+
continue
228+
}
229+
230+
misses = append(misses, id)
231+
}
232+
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(requests))
233+
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(hit))
234+
235+
return hits, misses
236+
}

0 commit comments

Comments
 (0)