Skip to content

Commit 30f59fd

Browse files
committed
Tested v2MetricsForLabelMatchers()
Signed-off-by: Marco Pracucci <marco@pracucci.com>
1 parent f2a894e commit 30f59fd

File tree

4 files changed

+257
-57
lines changed

4 files changed

+257
-57
lines changed

pkg/ingester/ingester_v2.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cortexproject/cortex/pkg/util/validation"
1313
"github.com/go-kit/kit/log/level"
1414
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/common/model"
1516
"github.com/prometheus/prometheus/tsdb"
1617
lbls "github.com/prometheus/prometheus/tsdb/labels"
1718
"github.com/thanos-io/thanos/pkg/block/metadata"
@@ -218,7 +219,6 @@ func (i *Ingester) v2LabelNames(ctx old_ctx.Context, req *client.LabelNamesReque
218219
}
219220

220221
func (i *Ingester) v2MetricsForLabelMatchers(ctx old_ctx.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
221-
fmt.Println("v2MetricsForLabelMatchers() req.StartTimestampMs:", req.StartTimestampMs, "req.EndTimestampMs:", req.EndTimestampMs)
222222
userID, err := user.ExtractOrgID(ctx)
223223
if err != nil {
224224
return nil, err
@@ -230,19 +230,20 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Me
230230
}
231231

232232
// Parse the request
233-
from, through, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req)
233+
from, to, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req)
234234
if err != nil {
235235
return nil, err
236236
}
237237

238238
// Create a new instance of the TSDB querier
239-
q, err := db.Querier(from.Unix()*1000, through.Unix()*1000)
239+
q, err := db.Querier(from.Unix()*1000, to.Unix()*1000)
240240
if err != nil {
241241
return nil, err
242242
}
243243
defer q.Close()
244244

245245
// Run a query for each matchers set and collect all the results
246+
added := model.FingerprintSet{}
246247
result := &client.MetricsForLabelMatchersResponse{
247248
Metric: make([]*client.Metric, 0),
248249
}
@@ -259,14 +260,24 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Me
259260
}
260261

261262
for seriesSet.Next() {
262-
// TODO(pracucci): test the error case
263263
if seriesSet.Err() != nil {
264264
break
265265
}
266266

267+
// Given the same series can be matched by multiple matchers and we want to
268+
// return the unique set of matching series, we do check if the series has
269+
// already been added to the result
270+
ls := seriesSet.At().Labels()
271+
fp := client.Fingerprint(cortex_tsdb.FromLabelsToLegacyLabels(ls))
272+
if _, ok := added[fp]; ok {
273+
continue
274+
}
275+
267276
result.Metric = append(result.Metric, &client.Metric{
268-
Labels: cortex_tsdb.FromLabelsToLabelAdapters(seriesSet.At().Labels()),
277+
Labels: cortex_tsdb.FromLabelsToLabelAdapters(ls),
269278
})
279+
280+
added[fp] = struct{}{}
270281
}
271282

272283
// In case of any error while iterating the series, we break

pkg/ingester/ingester_v2_test.go

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
package ingester
2+
3+
import (
4+
"io/ioutil"
5+
"math"
6+
"os"
7+
"testing"
8+
"time"
9+
10+
"github.com/cortexproject/cortex/pkg/ingester/client"
11+
"github.com/cortexproject/cortex/pkg/ring"
12+
"github.com/cortexproject/cortex/pkg/util/test"
13+
"github.com/cortexproject/cortex/pkg/util/validation"
14+
"github.com/prometheus/common/model"
15+
"github.com/prometheus/prometheus/pkg/labels"
16+
"github.com/stretchr/testify/assert"
17+
"github.com/stretchr/testify/require"
18+
"github.com/weaveworks/common/user"
19+
"golang.org/x/net/context"
20+
)
21+
22+
func Test_Ingester_v2MetricsForLabelMatchers(t *testing.T) {
23+
fixtures := []struct {
24+
lbls labels.Labels
25+
value float64
26+
timestamp int64
27+
}{
28+
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}}, 1, 100000},
29+
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}}, 1, 110000},
30+
{labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000},
31+
// The two following series have the same FastFingerprint=e002a3a451262627
32+
{labels.Labels{{Name: labels.MetricName, Value: "collision"}, {Name: "app", Value: "l"}, {Name: "uniq0", Value: "0"}, {Name: "uniq1", Value: "1"}}, 1, 300000},
33+
{labels.Labels{{Name: labels.MetricName, Value: "collision"}, {Name: "app", Value: "m"}, {Name: "uniq0", Value: "1"}, {Name: "uniq1", Value: "1"}}, 1, 300000},
34+
}
35+
36+
tests := map[string]struct {
37+
from int64
38+
to int64
39+
matchers []*client.LabelMatchers
40+
expected []*client.Metric
41+
}{
42+
"should return an empty response if no metric match": {
43+
from: math.MinInt64,
44+
to: math.MaxInt64,
45+
matchers: []*client.LabelMatchers{{
46+
Matchers: []*client.LabelMatcher{
47+
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "unknown"},
48+
},
49+
}},
50+
expected: []*client.Metric{},
51+
},
52+
"should filter metrics by single matcher": {
53+
from: math.MinInt64,
54+
to: math.MaxInt64,
55+
matchers: []*client.LabelMatchers{{
56+
Matchers: []*client.LabelMatcher{
57+
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
58+
},
59+
}},
60+
expected: []*client.Metric{
61+
{Labels: client.FromLabelsToLabelAdapters(fixtures[0].lbls)},
62+
{Labels: client.FromLabelsToLabelAdapters(fixtures[1].lbls)},
63+
},
64+
},
65+
"should filter metrics by multiple matchers": {
66+
from: math.MinInt64,
67+
to: math.MaxInt64,
68+
matchers: []*client.LabelMatchers{
69+
{
70+
Matchers: []*client.LabelMatcher{
71+
{Type: client.EQUAL, Name: "status", Value: "200"},
72+
},
73+
},
74+
{
75+
Matchers: []*client.LabelMatcher{
76+
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_2"},
77+
},
78+
},
79+
},
80+
expected: []*client.Metric{
81+
{Labels: client.FromLabelsToLabelAdapters(fixtures[0].lbls)},
82+
{Labels: client.FromLabelsToLabelAdapters(fixtures[2].lbls)},
83+
},
84+
},
85+
"should filter metrics by time range": {
86+
from: 100000,
87+
to: 100000,
88+
matchers: []*client.LabelMatchers{{
89+
Matchers: []*client.LabelMatcher{
90+
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
91+
},
92+
}},
93+
expected: []*client.Metric{
94+
{Labels: client.FromLabelsToLabelAdapters(fixtures[0].lbls)},
95+
},
96+
},
97+
"should not return duplicated metrics on overlapping matchers": {
98+
from: math.MinInt64,
99+
to: math.MaxInt64,
100+
matchers: []*client.LabelMatchers{
101+
{
102+
Matchers: []*client.LabelMatcher{
103+
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
104+
},
105+
},
106+
{
107+
Matchers: []*client.LabelMatcher{
108+
{Type: client.REGEX_MATCH, Name: model.MetricNameLabel, Value: "test.*"},
109+
},
110+
},
111+
},
112+
expected: []*client.Metric{
113+
{Labels: client.FromLabelsToLabelAdapters(fixtures[0].lbls)},
114+
{Labels: client.FromLabelsToLabelAdapters(fixtures[1].lbls)},
115+
{Labels: client.FromLabelsToLabelAdapters(fixtures[2].lbls)},
116+
},
117+
},
118+
"should return all matching metrics even if their FastFingerprint collide": {
119+
from: math.MinInt64,
120+
to: math.MaxInt64,
121+
matchers: []*client.LabelMatchers{{
122+
Matchers: []*client.LabelMatcher{
123+
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "collision"},
124+
},
125+
}},
126+
expected: []*client.Metric{
127+
{Labels: client.FromLabelsToLabelAdapters(fixtures[3].lbls)},
128+
{Labels: client.FromLabelsToLabelAdapters(fixtures[4].lbls)},
129+
},
130+
},
131+
}
132+
133+
// Create ingester
134+
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig())
135+
require.NoError(t, err)
136+
defer i.Shutdown()
137+
defer cleanup()
138+
139+
// Wait until it's ACTIVE
140+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
141+
return i.lifecycler.GetState()
142+
})
143+
144+
// Push fixtures
145+
ctx := user.InjectOrgID(context.Background(), "test")
146+
147+
for _, series := range fixtures {
148+
req, _ := mockWriteRequest(series.lbls, series.value, series.timestamp)
149+
_, err := i.v2Push(ctx, req)
150+
require.NoError(t, err)
151+
}
152+
153+
// Run tests
154+
for testName, testData := range tests {
155+
testData := testData
156+
157+
t.Run(testName, func(t *testing.T) {
158+
req := &client.MetricsForLabelMatchersRequest{
159+
StartTimestampMs: testData.from,
160+
EndTimestampMs: testData.to,
161+
MatchersSet: testData.matchers,
162+
}
163+
164+
res, err := i.v2MetricsForLabelMatchers(ctx, req)
165+
require.NoError(t, err)
166+
assert.ElementsMatch(t, testData.expected, res.Metric)
167+
})
168+
}
169+
}
170+
171+
func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) (*client.WriteRequest, *client.QueryResponse) {
172+
samples := []client.Sample{
173+
{
174+
TimestampMs: timestampMs,
175+
Value: value,
176+
},
177+
}
178+
179+
req := client.ToWriteRequest([]labels.Labels{lbls}, samples, client.API)
180+
181+
// Generate the expected response
182+
expectedResponse := &client.QueryResponse{
183+
Timeseries: []client.TimeSeries{
184+
{
185+
Labels: client.FromLabelsToLabelAdapters(lbls),
186+
Samples: samples,
187+
},
188+
},
189+
}
190+
191+
return req, expectedResponse
192+
}
193+
194+
func newIngesterMockWithTSDBStorage(ingesterCfg Config) (*Ingester, func(), error) {
195+
clientCfg := defaultClientTestConfig()
196+
limits := defaultLimitsTestConfig()
197+
198+
overrides, err := validation.NewOverrides(limits)
199+
if err != nil {
200+
return nil, nil, err
201+
}
202+
203+
// Create a temporary directory for TSDB
204+
tempDir, err := ioutil.TempDir("", "tsdb")
205+
if err != nil {
206+
return nil, nil, err
207+
}
208+
209+
ingesterCfg.TSDBEnabled = true
210+
ingesterCfg.TSDBConfig.Dir = tempDir
211+
ingesterCfg.TSDBConfig.Backend = "s3"
212+
ingesterCfg.TSDBConfig.S3.Endpoint = "localhost"
213+
214+
ingester, err := NewV2(ingesterCfg, clientCfg, overrides, nil, nil)
215+
if err != nil {
216+
return nil, nil, err
217+
}
218+
219+
// Create a cleanup function that the caller should call with defer
220+
cleanup := func() {
221+
os.RemoveAll(tempDir)
222+
}
223+
224+
return ingester, cleanup, nil
225+
}

pkg/ingester/lifecycle_test.go

Lines changed: 8 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -108,32 +108,9 @@ func TestIngesterTransfer(t *testing.T) {
108108
})
109109

110110
// Now write a sample to this ingester
111-
const ts = 123000
112-
const val = 456
113-
var (
114-
l = labels.Labels{{Name: labels.MetricName, Value: "foo"}}
115-
sampleData = []client.Sample{
116-
{
117-
TimestampMs: ts,
118-
Value: val,
119-
},
120-
}
121-
expectedResponse = &client.QueryResponse{
122-
Timeseries: []client.TimeSeries{
123-
{
124-
Labels: client.FromLabelsToLabelAdapters(l),
125-
Samples: []client.Sample{
126-
{
127-
Value: val,
128-
TimestampMs: ts,
129-
},
130-
},
131-
},
132-
},
133-
}
134-
)
111+
req, expectedResponse := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
135112
ctx := user.InjectOrgID(context.Background(), userID)
136-
_, err = ing1.Push(ctx, client.ToWriteRequest([]labels.Labels{l}, sampleData, client.API))
113+
_, err = ing1.Push(ctx, req)
137114
require.NoError(t, err)
138115

139116
// Start a second ingester, but let it go into PENDING
@@ -170,7 +147,8 @@ func TestIngesterTransfer(t *testing.T) {
170147
assert.Equal(t, expectedResponse, response)
171148

172149
// Check we can send the same sample again to the new ingester and get the same result
173-
_, err = ing2.Push(ctx, client.ToWriteRequest([]labels.Labels{l}, sampleData, client.API))
150+
req, _ = mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
151+
_, err = ing2.Push(ctx, req)
174152
require.NoError(t, err)
175153
response, err = ing2.Query(ctx, request)
176154
require.NoError(t, err)
@@ -446,32 +424,9 @@ func TestV2IngesterTransfer(t *testing.T) {
446424
})
447425

448426
// Now write a sample to this ingester
449-
const ts = 123000
450-
const val = 456
451-
var (
452-
l = labels.Labels{{Name: labels.MetricName, Value: "foo"}}
453-
sampleData = []client.Sample{
454-
{
455-
TimestampMs: ts,
456-
Value: val,
457-
},
458-
}
459-
expectedResponse = &client.QueryResponse{
460-
Timeseries: []client.TimeSeries{
461-
{
462-
Labels: client.FromLabelsToLabelAdapters(l),
463-
Samples: []client.Sample{
464-
{
465-
Value: val,
466-
TimestampMs: ts,
467-
},
468-
},
469-
},
470-
},
471-
}
472-
)
427+
req, expectedResponse := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
473428
ctx := user.InjectOrgID(context.Background(), userID)
474-
_, err = ing1.Push(ctx, client.ToWriteRequest([]labels.Labels{l}, sampleData, client.API))
429+
_, err = ing1.Push(ctx, req)
475430
require.NoError(t, err)
476431

477432
// Start a second ingester, but let it go into PENDING
@@ -516,7 +471,8 @@ func TestV2IngesterTransfer(t *testing.T) {
516471
assert.Equal(t, expectedResponse, response)
517472

518473
// Check we can send the same sample again to the new ingester and get the same result
519-
_, err = ing2.Push(ctx, client.ToWriteRequest([]labels.Labels{l}, sampleData, client.API))
474+
req, _ = mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
475+
_, err = ing2.Push(ctx, req)
520476
require.NoError(t, err)
521477
response, err = ing2.Query(ctx, request)
522478
require.NoError(t, err)

0 commit comments

Comments
 (0)