Skip to content

Commit dba615b

Browse files
committed
create more tests on the expanded postings cache
Signed-off-by: alanprot <alanprot@gmail.com>
1 parent 76adacd commit dba615b

File tree

2 files changed

+185
-0
lines changed

2 files changed

+185
-0
lines changed

pkg/ingester/ingester_test.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5605,6 +5605,183 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) {
56055605
wg.Wait()
56065606
}
56075607

5608+
func TestExpendedPostingsCacheMatchers(t *testing.T) {
5609+
cfg := defaultIngesterTestConfig(t)
5610+
cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second
5611+
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
5612+
cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled = true
5613+
cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true
5614+
5615+
ctx := user.InjectOrgID(context.Background(), userID)
5616+
5617+
r := prometheus.NewRegistry()
5618+
ing, err := prepareIngesterWithBlocksStorage(t, cfg, r)
5619+
require.NoError(t, err)
5620+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
5621+
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
5622+
5623+
// Wait until the ingester is ACTIVE
5624+
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
5625+
return ing.lifecycler.GetState()
5626+
})
5627+
5628+
numberOfMetricNames := 10
5629+
seriesPerMetricsNames := 25
5630+
timeStamp := int64(60 * 1000)
5631+
seriesCreated := map[string]labels.Labels{}
5632+
5633+
for i := 0; i < numberOfMetricNames; i++ {
5634+
metricName := fmt.Sprintf("metric_%v", i)
5635+
for j := 0; j < seriesPerMetricsNames; j++ {
5636+
s := labels.FromStrings(labels.MetricName, metricName, "labelA", fmt.Sprintf("series_%v", j))
5637+
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{s}, []cortexpb.Sample{{Value: 2, TimestampMs: timeStamp}}, nil, nil, cortexpb.API))
5638+
seriesCreated[s.String()] = s
5639+
require.NoError(t, err)
5640+
}
5641+
}
5642+
5643+
db := ing.getTSDB(userID)
5644+
5645+
type testCase struct {
5646+
matchers []*client.LabelMatcher
5647+
}
5648+
5649+
cases := []testCase{}
5650+
5651+
nameMatcher := &client.LabelMatcher{
5652+
Type: client.EQUAL,
5653+
Name: labels.MetricName,
5654+
Value: "metric_0",
5655+
}
5656+
5657+
for i := 0; i < 4; i++ {
5658+
tc := testCase{
5659+
matchers: []*client.LabelMatcher{nameMatcher},
5660+
}
5661+
5662+
switch client.MatchType(i) {
5663+
case client.EQUAL | client.NOT_EQUAL:
5664+
tc.matchers = append(tc.matchers, &client.LabelMatcher{
5665+
Type: client.MatchType(i),
5666+
Name: "labelA",
5667+
Value: "series_0",
5668+
})
5669+
default:
5670+
tc.matchers = append(tc.matchers, &client.LabelMatcher{
5671+
Type: client.MatchType(i),
5672+
Name: "labelA",
5673+
Value: "series_.*",
5674+
})
5675+
}
5676+
cases = append(cases, tc)
5677+
}
5678+
5679+
for _, v := range []string{".*", "", ".+"} {
5680+
cases = append(cases,
5681+
testCase{
5682+
matchers: []*client.LabelMatcher{
5683+
nameMatcher,
5684+
{
5685+
Type: client.REGEX_MATCH,
5686+
Name: "labelA",
5687+
Value: v,
5688+
},
5689+
},
5690+
},
5691+
testCase{
5692+
matchers: []*client.LabelMatcher{
5693+
nameMatcher,
5694+
{
5695+
Type: client.REGEX_NO_MATCH,
5696+
Name: "labelA",
5697+
Value: v,
5698+
},
5699+
},
5700+
},
5701+
)
5702+
}
5703+
5704+
ranges := []struct {
5705+
startTs, endTs int64
5706+
hasSamples bool
5707+
}{
5708+
// Totally in the past
5709+
{
5710+
startTs: 0,
5711+
endTs: timeStamp / 2,
5712+
hasSamples: false,
5713+
},
5714+
{
5715+
startTs: timeStamp / 2,
5716+
endTs: timeStamp,
5717+
hasSamples: true,
5718+
},
5719+
{
5720+
startTs: timeStamp / 2,
5721+
endTs: timeStamp * 2,
5722+
hasSamples: true,
5723+
},
5724+
{
5725+
startTs: timeStamp + 1,
5726+
endTs: timeStamp * 2,
5727+
hasSamples: false,
5728+
},
5729+
}
5730+
5731+
verify := func(t *testing.T, tc testCase, startTs, endTs int64, hasSamples bool) {
5732+
s := &mockQueryStreamServer{ctx: ctx}
5733+
err := ing.QueryStream(&client.QueryRequest{
5734+
StartTimestampMs: startTs,
5735+
EndTimestampMs: endTs,
5736+
Matchers: tc.matchers,
5737+
}, s)
5738+
require.NoError(t, err)
5739+
if hasSamples {
5740+
expectedCount := len(seriesCreated)
5741+
matchers, err := client.FromLabelMatchers(ing.matchersCache, tc.matchers)
5742+
require.NoError(t, err)
5743+
for _, s := range seriesCreated {
5744+
for _, m := range matchers {
5745+
if !m.Matches(s.Get(m.Name)) {
5746+
expectedCount--
5747+
break
5748+
}
5749+
}
5750+
}
5751+
5752+
require.Equal(t, expectedCount, len(s.series))
5753+
} else {
5754+
require.Equal(t, 0, len(s.series))
5755+
}
5756+
}
5757+
5758+
for _, tc := range cases {
5759+
testName := ""
5760+
for _, matcher := range tc.matchers {
5761+
t, _ := matcher.MatcherType()
5762+
testName += matcher.Name + t.String() + matcher.Value + "|"
5763+
5764+
}
5765+
t.Run(fmt.Sprintf("%v", testName), func(t *testing.T) {
5766+
for _, r := range ranges {
5767+
t.Run(fmt.Sprintf("start=%v,end=%v", r.startTs, r.endTs), func(t *testing.T) {
5768+
db.postingCache.Clear()
5769+
5770+
// lets run 2 times to hit the cache
5771+
for i := 0; i < 2; i++ {
5772+
verify(t, tc, r.startTs, r.endTs, r.hasSamples)
5773+
}
5774+
5775+
// run the test again with all other ranges
5776+
for _, r1 := range ranges {
5777+
verify(t, tc, r1.startTs, r1.endTs, r1.hasSamples)
5778+
}
5779+
})
5780+
}
5781+
})
5782+
}
5783+
}
5784+
56085785
func TestExpendedPostingsCache(t *testing.T) {
56095786
cfg := defaultIngesterTestConfig(t)
56105787
cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second

pkg/storage/tsdb/expanded_postings_cache.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ type ExpandedPostingsCache interface {
125125
PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
126126
ExpireSeries(metric labels.Labels)
127127
PurgeExpiredItems()
128+
Clear()
128129
Size() int
129130
}
130131

@@ -138,6 +139,12 @@ type blocksPostingsForMatchersCache struct {
138139

139140
metrics *ExpandedPostingsCacheMetrics
140141
seedByHash *seedByHash
142+
cfg TSDBPostingsCacheConfig
143+
}
144+
145+
func (c *blocksPostingsForMatchersCache) Clear() {
146+
c.headCache = newFifoCache[[]storage.SeriesRef](c.cfg.Head, "head", c.metrics, c.cfg.timeNow)
147+
c.blocksCache = newFifoCache[[]storage.SeriesRef](c.cfg.Blocks, "block", c.metrics, c.cfg.timeNow)
141148
}
142149

143150
func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash) ExpandedPostingsCache {
@@ -157,6 +164,7 @@ func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfi
157164
metrics: metrics,
158165
seedByHash: seedByHash,
159166
userId: userId,
167+
cfg: cfg,
160168
}
161169
}
162170

0 commit comments

Comments
 (0)