@@ -177,6 +177,67 @@ func TestMatcherCache(t *testing.T) {
177
177
` , callPerMatcher * numberOfDifferentMatchers - numberOfDifferentMatchers , cfg .MatchersCacheMaxItems , callPerMatcher * numberOfDifferentMatchers )), "ingester_matchers_cache_requests_total" , "ingester_matchers_cache_hits_total" , "ingester_matchers_cache_items" , "ingester_matchers_cache_max_items" , "ingester_matchers_cache_evicted_total" ))
178
178
}
179
179
180
+ func TestIngesterDeletionRace (t * testing.T ) {
181
+ registry := prometheus .NewRegistry ()
182
+ limits := defaultLimitsTestConfig ()
183
+ tenantLimits := newMockTenantLimits (map [string ]* validation.Limits {userID : & limits })
184
+ cfg := defaultIngesterTestConfig (t )
185
+ cfg .BlocksStorageConfig .TSDB .CloseIdleTSDBInterval = 1 * time .Millisecond
186
+ cfg .BlocksStorageConfig .TSDB .CloseIdleTSDBTimeout = 10 * time .Second
187
+ cfg .BlocksStorageConfig .TSDB .ExpandedCachingExpireInterval = 1 * time .Millisecond
188
+ cfg .BlocksStorageConfig .TSDB .PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig {
189
+ SeedSize : 3 , // lets make sure all metric names collide
190
+ Head : cortex_tsdb.PostingsCacheConfig {
191
+ Enabled : true ,
192
+ Ttl : time .Hour ,
193
+ MaxBytes : 1024 * 1024 * 1024 ,
194
+ },
195
+ Blocks : cortex_tsdb.PostingsCacheConfig {
196
+ Enabled : true ,
197
+ Ttl : time .Hour ,
198
+ MaxBytes : 1024 * 1024 * 1024 ,
199
+ },
200
+ }
201
+
202
+ dir := t .TempDir ()
203
+ chunksDir := filepath .Join (dir , "chunks" )
204
+ blocksDir := filepath .Join (dir , "blocks" )
205
+ require .NoError (t , os .Mkdir (chunksDir , os .ModePerm ))
206
+ require .NoError (t , os .Mkdir (blocksDir , os .ModePerm ))
207
+
208
+ ing , err := prepareIngesterWithBlocksStorageAndLimits (t , cfg , limits , tenantLimits , blocksDir , registry , true )
209
+ require .NoError (t , err )
210
+ require .NoError (t , services .StartAndAwaitRunning (context .Background (), ing ))
211
+ // Wait until it's ACTIVE
212
+ test .Poll (t , time .Second , ring .ACTIVE , func () interface {} {
213
+ return ing .lifecycler .GetState ()
214
+ })
215
+
216
+ numberOfTenants := 1500
217
+ wg := sync.WaitGroup {}
218
+ wg .Add (numberOfTenants )
219
+
220
+ for i := 0 ; i < numberOfTenants ; i ++ {
221
+ go func () {
222
+ defer wg .Done ()
223
+ u := fmt .Sprintf ("userId_%v" , i )
224
+ ctx := user .InjectOrgID (context .Background (), u )
225
+ samples := []cortexpb.Sample {{Value : 2 , TimestampMs : 10 }}
226
+ _ , err := ing .Push (ctx , cortexpb .ToWriteRequest ([]labels.Labels {labels .FromStrings (labels .MetricName , "name" )}, samples , nil , nil , cortexpb .API ))
227
+ require .NoError (t , err )
228
+ s := & mockQueryStreamServer {ctx : ctx }
229
+ err = ing .QueryStream (& client.QueryRequest {
230
+ StartTimestampMs : math .MinInt64 ,
231
+ EndTimestampMs : math .MaxInt64 ,
232
+ Matchers : []* client.LabelMatcher {{Type : client .REGEX_MATCH , Name : labels .MetricName , Value : ".*" }},
233
+ }, s )
234
+ require .NoError (t , err )
235
+ ing .getTSDB (u ).deletionMarkFound .Store (true ) // lets force close the tenant
236
+ }()
237
+ }
238
+ wg .Wait ()
239
+ }
240
+
180
241
func TestIngesterPerLabelsetLimitExceeded (t * testing.T ) {
181
242
limits := defaultLimitsTestConfig ()
182
243
userID := "1"
0 commit comments