@@ -197,10 +197,17 @@ func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd
197
197
return c .result (promise )
198
198
}
199
199
200
- func (c * BlocksPostingsForMatchersCache ) result (promise * cacheEntryPromise [[]storage.SeriesRef ]) func (ctx context.Context ) (index.Postings , error ) {
200
+ func (c * BlocksPostingsForMatchersCache ) result (ce * cacheEntryPromise [[]storage.SeriesRef ]) func (ctx context.Context ) (index.Postings , error ) {
201
201
return func (ctx context.Context ) (index.Postings , error ) {
202
- ids , err := promise .result (ctx )
203
- return index .NewListPostings (ids ), err
202
+ select {
203
+ case <- ctx .Done ():
204
+ return nil , ctx .Err ()
205
+ case <- ce .done :
206
+ if ctx .Err () != nil {
207
+ return nil , ctx .Err ()
208
+ }
209
+ return index .NewListPostings (ce .v ), ce .err
210
+ }
204
211
}
205
212
}
206
213
@@ -327,9 +334,12 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
327
334
c .expire ()
328
335
}
329
336
330
- // If is cached but is expired, lets try to replace the cache value
331
- if ok && loaded .(* cacheEntryPromise [V ]).isExpired (c .cfg .Ttl , c .timeNow ()) {
332
- if c .cachedValues .CompareAndSwap (k , loaded , r ) {
337
+ if ok {
338
+ // If the promise is already in the cache, lets wait it to fetch the data.
339
+ <- loaded .(* cacheEntryPromise [V ]).done
340
+
341
+ // If is cached but is expired, lets try to replace the cache value.
342
+ if loaded .(* cacheEntryPromise [V ]).isExpired (c .cfg .Ttl , c .timeNow ()) && c .cachedValues .CompareAndSwap (k , loaded , r ) {
333
343
r .v , r .sizeBytes , r .err = fetch ()
334
344
r .sizeBytes += int64 (len (k ))
335
345
c .updateSize (loaded .(* cacheEntryPromise [V ]).sizeBytes , r .sizeBytes )
@@ -404,19 +414,6 @@ type cacheEntryPromise[V any] struct {
404
414
err error
405
415
}
406
416
407
- func (ce * cacheEntryPromise [V ]) result (ctx context.Context ) (V , error ) {
408
- select {
409
- case <- ctx .Done ():
410
- return ce .v , ctx .Err ()
411
- case <- ce .done :
412
- if ctx .Err () != nil {
413
- return ce .v , ctx .Err ()
414
- }
415
-
416
- return ce .v , ce .err
417
- }
418
- }
419
-
420
417
func (ce * cacheEntryPromise [V ]) isExpired (ttl time.Duration , now time.Time ) bool {
421
418
ts := ce .ts
422
419
r := now .Sub (ts )
0 commit comments