7
7
"fmt"
8
8
"io"
9
9
"math"
10
- "math/rand"
11
10
"net"
12
11
"net/http"
13
12
"net/http/httptest"
@@ -183,11 +182,7 @@ func TestIngesterDeletionRace(t *testing.T) {
183
182
limits := defaultLimitsTestConfig ()
184
183
tenantLimits := newMockTenantLimits (map [string ]* validation.Limits {userID : & limits })
185
184
cfg := defaultIngesterTestConfig (t )
186
- cfg .BlocksStorageConfig .TSDB .CloseIdleTSDBInterval = 5 * time .Millisecond
187
- cfg .BlocksStorageConfig .TSDB .CloseIdleTSDBTimeout = 10 * time .Second
188
- cfg .BlocksStorageConfig .TSDB .ExpandedCachingExpireInterval = 5 * time .Millisecond
189
185
cfg .BlocksStorageConfig .TSDB .PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig {
190
- SeedSize : 3 , // lets make sure all metric names collide
191
186
Head : cortex_tsdb.PostingsCacheConfig {
192
187
Enabled : true ,
193
188
Ttl : time .Hour ,
@@ -215,7 +210,7 @@ func TestIngesterDeletionRace(t *testing.T) {
215
210
return ing .lifecycler .GetState ()
216
211
})
217
212
218
- numberOfTenants := 150
213
+ numberOfTenants := 50
219
214
wg := sync.WaitGroup {}
220
215
wg .Add (numberOfTenants )
221
216
@@ -227,18 +222,30 @@ func TestIngesterDeletionRace(t *testing.T) {
227
222
samples := []cortexpb.Sample {{Value : 2 , TimestampMs : 10 }}
228
223
_ , err := ing .Push (ctx , cortexpb .ToWriteRequest ([]labels.Labels {labels .FromStrings (labels .MetricName , "name" )}, samples , nil , nil , cortexpb .API ))
229
224
require .NoError (t , err )
230
- s := & mockQueryStreamServer {ctx : ctx }
231
- err = ing .QueryStream (& client.QueryRequest {
232
- StartTimestampMs : math .MinInt64 ,
233
- EndTimestampMs : math .MaxInt64 ,
234
- Matchers : []* client.LabelMatcher {{Type : client .REGEX_MATCH , Name : labels .MetricName , Value : ".*" }},
235
- }, s )
236
- require .NoError (t , err )
237
- time .Sleep (time .Duration (rand .Int63n (5 )) * time .Millisecond )
225
+ ing .getTSDB (u ).postingCache = & wrappedExpandedPostingsCache {ExpandedPostingsCache : ing .getTSDB (u ).postingCache , purgeDelay : 10 * time .Millisecond }
238
226
ing .getTSDB (u ).deletionMarkFound .Store (true ) // lets force close the tenant
239
227
}()
240
228
}
229
+
241
230
wg .Wait ()
231
+
232
+ ctx , c := context .WithCancel (context .Background ())
233
+ defer c ()
234
+
235
+ wg .Add (1 )
236
+ go func () {
237
+ wg .Done ()
238
+ ing .expirePostingsCache (ctx ) //nolint:errcheck
239
+ }()
240
+
241
+ go func () {
242
+ wg .Wait () // make sure we clean after we started the purge go routine
243
+ ing .closeAndDeleteIdleUserTSDBs (ctx ) //nolint:errcheck
244
+ }()
245
+
246
+ test .Poll (t , 5 * time .Second , 0 , func () interface {} {
247
+ return len (ing .getTSDBUsers ())
248
+ })
242
249
}
243
250
244
251
func TestIngesterPerLabelsetLimitExceeded (t * testing.T ) {
@@ -3592,6 +3599,17 @@ func (m *mockMetricsForLabelMatchersStreamServer) Context() context.Context {
3592
3599
return m .ctx
3593
3600
}
3594
3601
3602
+ type wrappedExpandedPostingsCache struct {
3603
+ cortex_tsdb.ExpandedPostingsCache
3604
+
3605
+ purgeDelay time.Duration
3606
+ }
3607
+
3608
+ func (w * wrappedExpandedPostingsCache ) PurgeExpiredItems () {
3609
+ time .Sleep (w .purgeDelay )
3610
+ w .ExpandedPostingsCache .PurgeExpiredItems ()
3611
+ }
3612
+
3595
3613
type mockQueryStreamServer struct {
3596
3614
grpc.ServerStream
3597
3615
ctx context.Context
0 commit comments