Skip to content

Commit adce1e0

Browse files
committed
Backfill lower caches on the multi cache implementation
Signed-off-by: Alan Protasio <alanprot@gmail.com>
1 parent 634df35 commit adce1e0

File tree

3 files changed

+63
-13
lines changed

3 files changed

+63
-13
lines changed

integration/querier_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,11 +297,11 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
297297
}
298298
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache
299299

300-
if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendInMemory) {
300+
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
301301
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items")) // as before
302302
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total")) // as before
303303
}
304-
if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendMemcached) {
304+
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
305305
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(23-l0CacheHits), "thanos_memcached_operations_total")) // as before + 2 gets - cache hits
306306
}
307307

pkg/storage/tsdb/multilevel_cache.go

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,35 @@ func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []b
3030
func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) {
3131
misses = keys
3232
hits = map[labels.Label][]byte{}
33-
for _, c := range m.caches {
34-
h, m := c.FetchMultiPostings(ctx, blockID, misses)
35-
misses = m
33+
backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{}
34+
for i, c := range m.caches {
35+
backfillMap[c] = []map[labels.Label][]byte{}
36+
h, mi := c.FetchMultiPostings(ctx, blockID, misses)
37+
misses = mi
3638

3739
for label, bytes := range h {
3840
hits[label] = bytes
3941
}
42+
43+
if i > 0 {
44+
backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h)
45+
}
46+
4047
if len(misses) == 0 {
4148
break
4249
}
4350
}
4451

52+
defer func() {
53+
for cache, hit := range backfillMap {
54+
for _, values := range hit {
55+
for l, b := range values {
56+
cache.StorePostings(blockID, l, b)
57+
}
58+
}
59+
}
60+
}()
61+
4562
return hits, misses
4663
}
4764

@@ -59,8 +76,11 @@ func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*l
5976
}
6077

6178
func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) {
62-
for _, c := range m.caches {
79+
for i, c := range m.caches {
6380
if d, h := c.FetchExpandedPostings(ctx, blockID, matchers); h {
81+
if i > 0 {
82+
m.caches[i-1].StoreExpandedPostings(blockID, matchers, d)
83+
}
6484
return d, h
6585
}
6686
}
@@ -84,18 +104,36 @@ func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v
84104
func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
85105
misses = ids
86106
hits = map[storage.SeriesRef][]byte{}
87-
for _, c := range m.caches {
88-
h, m := c.FetchMultiSeries(ctx, blockID, misses)
89-
misses = m
107+
backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{}
108+
109+
for i, c := range m.caches {
110+
backfillMap[c] = []map[storage.SeriesRef][]byte{}
111+
h, miss := c.FetchMultiSeries(ctx, blockID, misses)
112+
misses = miss
90113

91114
for label, bytes := range h {
92115
hits[label] = bytes
93116
}
117+
118+
if i > 0 && len(h) > 0 {
119+
backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h)
120+
}
121+
94122
if len(misses) == 0 {
95123
break
96124
}
97125
}
98126

127+
defer func() {
128+
for cache, hit := range backfillMap {
129+
for _, values := range hit {
130+
for m, b := range values {
131+
cache.StoreSeries(blockID, m, b)
132+
}
133+
}
134+
}
135+
}()
136+
99137
return hits, misses
100138
}
101139

pkg/storage/tsdb/multilevel_cache_test.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,16 +148,20 @@ func Test_MultiLevelCache(t *testing.T) {
148148
cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2})
149149
},
150150
},
151-
"[FetchMultiPostings] should fallback only the missing keys on l1": {
151+
"[FetchMultiPostings] should fallback and backfill only the missing keys on l1": {
152152
m1ExpectedCalls: map[string][][]interface{}{
153153
"FetchMultiPostings": {{bID, []labels.Label{l1, l2}}},
154+
"StorePostings": {{bID, l2, v}},
154155
},
155156
m2ExpectedCalls: map[string][][]interface{}{
156157
"FetchMultiPostings": {{bID, []labels.Label{l2}}},
157158
},
158159
m1MockedCalls: map[string][]interface{}{
159160
"FetchMultiPostings": {map[labels.Label][]byte{l1: make([]byte, 1)}, []labels.Label{l2}},
160161
},
162+
m2MockedCalls: map[string][]interface{}{
163+
"FetchMultiPostings": {map[labels.Label][]byte{l2: v}, []labels.Label{}},
164+
},
161165
call: func(cache storecache.IndexCache) {
162166
cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2})
163167
},
@@ -185,15 +189,19 @@ func Test_MultiLevelCache(t *testing.T) {
185189
cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2})
186190
},
187191
},
188-
"[FetchMultiSeries] should fallback only the missing keys on l1": {
192+
"[FetchMultiSeries] should fallback and backfill only the missing keys on l1": {
189193
m1ExpectedCalls: map[string][][]interface{}{
190194
"FetchMultiSeries": {{bID, []storage.SeriesRef{1, 2}}},
195+
"StoreSeries": {{bID, storage.SeriesRef(2), v}},
191196
},
192197
m2ExpectedCalls: map[string][][]interface{}{
193198
"FetchMultiSeries": {{bID, []storage.SeriesRef{2}}},
194199
},
195200
m1MockedCalls: map[string][]interface{}{
196-
"FetchMultiSeries": {map[storage.SeriesRef][]byte{1: make([]byte, 1)}, []storage.SeriesRef{2}},
201+
"FetchMultiSeries": {map[storage.SeriesRef][]byte{1: v}, []storage.SeriesRef{2}},
202+
},
203+
m2MockedCalls: map[string][]interface{}{
204+
"FetchMultiSeries": {map[storage.SeriesRef][]byte{2: v}, []storage.SeriesRef{2}},
197205
},
198206
call: func(cache storecache.IndexCache) {
199207
cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2})
@@ -211,13 +219,17 @@ func Test_MultiLevelCache(t *testing.T) {
211219
cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2})
212220
},
213221
},
214-
"[FetchExpandedPostings] Should fallback when miss": {
222+
"[FetchExpandedPostings] Should fallback and backfill when miss": {
215223
m1ExpectedCalls: map[string][][]interface{}{
224+
"StoreExpandedPostings": {{bID, []*labels.Matcher{matcher}, v}},
216225
"FetchExpandedPostings": {{bID, []*labels.Matcher{matcher}}},
217226
},
218227
m2ExpectedCalls: map[string][][]interface{}{
219228
"FetchExpandedPostings": {{bID, []*labels.Matcher{matcher}}},
220229
},
230+
m2MockedCalls: map[string][]interface{}{
231+
"FetchExpandedPostings": {v, true},
232+
},
221233
call: func(cache storecache.IndexCache) {
222234
cache.FetchExpandedPostings(ctx, bID, []*labels.Matcher{matcher})
223235
},

0 commit comments

Comments
 (0)