Skip to content

Commit 79adc33

Browse files
alanprotalexqyle
authored andcommitted
Test for nil on expire expanded postings (cortexproject#6521)
* Test for nil on expire expanded postings Signed-off-by: alanprot <alanprot@gmail.com> * stopping ingester Signed-off-by: alanprot <alanprot@gmail.com> * refactor the test to not timeout Signed-off-by: alanprot <alanprot@gmail.com> --------- Signed-off-by: alanprot <alanprot@gmail.com> Signed-off-by: Alex Le <leqiyue@amazon.com>
1 parent 12e8808 commit 79adc33

File tree

1 file changed

+82
-0
lines changed

1 file changed

+82
-0
lines changed

pkg/ingester/ingester_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,77 @@ func TestMatcherCache(t *testing.T) {
177177
`, callPerMatcher*numberOfDifferentMatchers-numberOfDifferentMatchers, cfg.MatchersCacheMaxItems, callPerMatcher*numberOfDifferentMatchers)), "ingester_matchers_cache_requests_total", "ingester_matchers_cache_hits_total", "ingester_matchers_cache_items", "ingester_matchers_cache_max_items", "ingester_matchers_cache_evicted_total"))
178178
}
179179

180+
func TestIngesterDeletionRace(t *testing.T) {
181+
registry := prometheus.NewRegistry()
182+
limits := defaultLimitsTestConfig()
183+
tenantLimits := newMockTenantLimits(map[string]*validation.Limits{userID: &limits})
184+
cfg := defaultIngesterTestConfig(t)
185+
cfg.BlocksStorageConfig.TSDB.PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig{
186+
Head: cortex_tsdb.PostingsCacheConfig{
187+
Enabled: true,
188+
Ttl: time.Hour,
189+
MaxBytes: 1024 * 1024 * 1024,
190+
},
191+
Blocks: cortex_tsdb.PostingsCacheConfig{
192+
Enabled: true,
193+
Ttl: time.Hour,
194+
MaxBytes: 1024 * 1024 * 1024,
195+
},
196+
}
197+
198+
dir := t.TempDir()
199+
chunksDir := filepath.Join(dir, "chunks")
200+
blocksDir := filepath.Join(dir, "blocks")
201+
require.NoError(t, os.Mkdir(chunksDir, os.ModePerm))
202+
require.NoError(t, os.Mkdir(blocksDir, os.ModePerm))
203+
204+
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, tenantLimits, blocksDir, registry, false)
205+
require.NoError(t, err)
206+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
207+
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
208+
// Wait until it's ACTIVE
209+
test.Poll(t, time.Second, ring.ACTIVE, func() interface{} {
210+
return ing.lifecycler.GetState()
211+
})
212+
213+
numberOfTenants := 50
214+
wg := sync.WaitGroup{}
215+
wg.Add(numberOfTenants)
216+
217+
for i := 0; i < numberOfTenants; i++ {
218+
go func() {
219+
defer wg.Done()
220+
u := fmt.Sprintf("userId_%v", i)
221+
ctx := user.InjectOrgID(context.Background(), u)
222+
samples := []cortexpb.Sample{{Value: 2, TimestampMs: 10}}
223+
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "name")}, samples, nil, nil, cortexpb.API))
224+
require.NoError(t, err)
225+
ing.getTSDB(u).postingCache = &wrappedExpandedPostingsCache{ExpandedPostingsCache: ing.getTSDB(u).postingCache, purgeDelay: 10 * time.Millisecond}
226+
ing.getTSDB(u).deletionMarkFound.Store(true) // lets force close the tenant
227+
}()
228+
}
229+
230+
wg.Wait()
231+
232+
ctx, c := context.WithCancel(context.Background())
233+
defer c()
234+
235+
wg.Add(1)
236+
go func() {
237+
wg.Done()
238+
ing.expirePostingsCache(ctx) //nolint:errcheck
239+
}()
240+
241+
go func() {
242+
wg.Wait() // make sure we clean after we started the purge go routine
243+
ing.closeAndDeleteIdleUserTSDBs(ctx) //nolint:errcheck
244+
}()
245+
246+
test.Poll(t, 5*time.Second, 0, func() interface{} {
247+
return len(ing.getTSDBUsers())
248+
})
249+
}
250+
180251
func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
181252
limits := defaultLimitsTestConfig()
182253
userID := "1"
@@ -3528,6 +3599,17 @@ func (m *mockMetricsForLabelMatchersStreamServer) Context() context.Context {
35283599
return m.ctx
35293600
}
35303601

3602+
type wrappedExpandedPostingsCache struct {
3603+
cortex_tsdb.ExpandedPostingsCache
3604+
3605+
purgeDelay time.Duration
3606+
}
3607+
3608+
func (w *wrappedExpandedPostingsCache) PurgeExpiredItems() {
3609+
time.Sleep(w.purgeDelay)
3610+
w.ExpandedPostingsCache.PurgeExpiredItems()
3611+
}
3612+
35313613
type mockQueryStreamServer struct {
35323614
grpc.ServerStream
35333615
ctx context.Context

0 commit comments

Comments
 (0)