4
4
"context"
5
5
"fmt"
6
6
"io"
7
+ "slices"
7
8
"sort"
8
9
"sync"
9
10
"time"
@@ -147,7 +148,7 @@ func (c *NamespacedOperatorCache) Error() error {
147
148
148
149
func (c * Cache ) Namespaced (namespaces ... string ) MultiCatalogOperatorFinder {
149
150
const (
150
- CachePopulateTimeout = time .Minute
151
+ cachePopulateTimeout = time .Minute
151
152
)
152
153
153
154
sources := c .sp .Sources (namespaces ... )
@@ -169,7 +170,9 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
169
170
if snapshot .Valid () {
170
171
result .snapshots [key ] = snapshot
171
172
} else {
172
- misses = append (misses , key )
173
+ if ! snapshot .RequestSentinelActive () {
174
+ misses = append (misses , key )
175
+ }
173
176
}
174
177
}()
175
178
}
@@ -209,19 +212,35 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
209
212
}
210
213
misses = misses [found :]
211
214
215
+ // remove any with a "live" outstanding request
216
+ misses = slices .DeleteFunc (misses , func (key SourceKey ) bool {
217
+ hdr := c .snapshots [key ]
218
+
219
+ // if we already have a request timestamp, we have an outstanding request, so prevent stacking
220
+ // and just send new requests if the previous one has expired
221
+ if hdr != nil && hdr .RequestSentinelActive () {
222
+ c .logger .Printf ("Skipping new request for %s, already in progress" , key )
223
+ return true
224
+ }
225
+ return false
226
+ })
227
+
212
228
for _ , miss := range misses {
213
- ctx , cancel := context .WithTimeout (context .Background (), CachePopulateTimeout )
229
+ ctx , cancel := context .WithTimeout (context .Background (), cachePopulateTimeout )
214
230
215
231
hdr := snapshotHeader {
216
- key : miss ,
217
- pop : cancel ,
218
- priority : c .sourcePriorityProvider .Priority (miss ),
232
+ key : miss ,
233
+ pop : cancel ,
234
+ priority : c .sourcePriorityProvider .Priority (miss ),
235
+ requestSentinel : time .Now ().Add (cachePopulateTimeout ), // set sentinel to prevent stacking requests
219
236
}
220
237
221
238
hdr .m .Lock ()
222
239
c .snapshots [miss ] = & hdr
223
240
result .snapshots [miss ] = & hdr
224
241
242
+ // don't adjust the request sentinel in the goroutine for any outcome, so that we don't stampede sources
243
+ // instead, reevaluate the sentinel during the next snapshot
225
244
go func (ctx context.Context , hdr * snapshotHeader , source Source ) {
226
245
defer hdr .m .Unlock ()
227
246
c .sem <- struct {}{}
@@ -294,6 +313,8 @@ type snapshotHeader struct {
294
313
pop context.CancelFunc
295
314
err error
296
315
priority int
316
+
317
+ requestSentinel time.Time
297
318
}
298
319
299
320
func (hdr * snapshotHeader ) Cancel () {
@@ -314,6 +335,16 @@ func (hdr *snapshotHeader) Valid() bool {
314
335
return true
315
336
}
316
337
338
+ func (hdr * snapshotHeader ) RequestSentinelActive () bool {
339
+ if hdr == nil {
340
+ return false
341
+ }
342
+
343
+ hdr .m .RLock ()
344
+ defer hdr .m .RUnlock ()
345
+ return time .Now ().Before (hdr .requestSentinel )
346
+ }
347
+
317
348
type sortableSnapshots struct {
318
349
snapshots []* snapshotHeader
319
350
preferredNamespace string
0 commit comments