Skip to content

Added tests covering TSDB read path in the ingester #1838

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 239 additions & 0 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/test"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -173,6 +177,241 @@ func TestIngester_v2Push(t *testing.T) {
}
}

func Test_Ingester_v2LabelNames(t *testing.T) {
series := []struct {
lbls labels.Labels
value float64
timestamp int64
}{
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
{labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000},
}

expected := []string{"__name__", "status", "route"}

// Create ingester
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
require.NoError(t, err)
defer i.Shutdown()
defer cleanup()

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

// Push series
ctx := user.InjectOrgID(context.Background(), "test")

for _, series := range series {
req, _ := mockWriteRequest(series.lbls, series.value, series.timestamp)
_, err := i.v2Push(ctx, req)
require.NoError(t, err)
}

// Get label names
res, err := i.v2LabelNames(ctx, &client.LabelNamesRequest{})
require.NoError(t, err)
assert.ElementsMatch(t, expected, res.LabelNames)
}

func Test_Ingester_v2LabelValues(t *testing.T) {
series := []struct {
lbls labels.Labels
value float64
timestamp int64
}{
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
{labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000},
}

expected := map[string][]string{
"__name__": {"test_1", "test_2"},
"status": {"200", "500"},
"route": {"get_user"},
"unknown": {},
}

// Create ingester
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
require.NoError(t, err)
defer i.Shutdown()
defer cleanup()

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

// Push series
ctx := user.InjectOrgID(context.Background(), "test")

for _, series := range series {
req, _ := mockWriteRequest(series.lbls, series.value, series.timestamp)
_, err := i.v2Push(ctx, req)
require.NoError(t, err)
}

// Get label values
for labelName, expectedValues := range expected {
req := &client.LabelValuesRequest{LabelName: labelName}
res, err := i.v2LabelValues(ctx, req)
require.NoError(t, err)
assert.ElementsMatch(t, expectedValues, res.LabelValues)
}
}

func Test_Ingester_v2Query(t *testing.T) {
series := []struct {
lbls labels.Labels
value float64
timestamp int64
}{
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
{labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000},
}

tests := map[string]struct {
from int64
to int64
matchers []*client.LabelMatcher
expected []client.TimeSeries
}{
"should return an empty response if no metric matches": {
from: math.MinInt64,
to: math.MaxInt64,
matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "unknown"},
},
expected: []client.TimeSeries{},
},
"should filter series by == matcher": {
from: math.MinInt64,
to: math.MaxInt64,
matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
},
expected: []client.TimeSeries{
{Labels: client.FromLabelsToLabelAdapters(series[0].lbls), Samples: []client.Sample{{Value: 1, TimestampMs: 100000}}},
{Labels: client.FromLabelsToLabelAdapters(series[1].lbls), Samples: []client.Sample{{Value: 1, TimestampMs: 110000}}},
},
},
"should filter series by != matcher": {
from: math.MinInt64,
to: math.MaxInt64,
matchers: []*client.LabelMatcher{
{Type: client.NOT_EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
},
expected: []client.TimeSeries{
{Labels: client.FromLabelsToLabelAdapters(series[2].lbls), Samples: []client.Sample{{Value: 2, TimestampMs: 200000}}},
},
},
"should filter series by =~ matcher": {
from: math.MinInt64,
to: math.MaxInt64,
matchers: []*client.LabelMatcher{
{Type: client.REGEX_MATCH, Name: model.MetricNameLabel, Value: ".*_1"},
},
expected: []client.TimeSeries{
{Labels: client.FromLabelsToLabelAdapters(series[0].lbls), Samples: []client.Sample{{Value: 1, TimestampMs: 100000}}},
{Labels: client.FromLabelsToLabelAdapters(series[1].lbls), Samples: []client.Sample{{Value: 1, TimestampMs: 110000}}},
},
},
"should filter series by !~ matcher": {
from: math.MinInt64,
to: math.MaxInt64,
matchers: []*client.LabelMatcher{
{Type: client.REGEX_NO_MATCH, Name: model.MetricNameLabel, Value: ".*_1"},
},
expected: []client.TimeSeries{
{Labels: client.FromLabelsToLabelAdapters(series[2].lbls), Samples: []client.Sample{{Value: 2, TimestampMs: 200000}}},
},
},
"should filter series by multiple matchers": {
from: math.MinInt64,
to: math.MaxInt64,
matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
{Type: client.REGEX_MATCH, Name: "status", Value: "5.."},
},
expected: []client.TimeSeries{
{Labels: client.FromLabelsToLabelAdapters(series[1].lbls), Samples: []client.Sample{{Value: 1, TimestampMs: 110000}}},
},
},
"should filter series by matcher and time range": {
from: 100000,
to: 100000,
matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
},
expected: []client.TimeSeries{
{Labels: client.FromLabelsToLabelAdapters(series[0].lbls), Samples: []client.Sample{{Value: 1, TimestampMs: 100000}}},
},
},
}

// Create ingester
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
require.NoError(t, err)
defer i.Shutdown()
defer cleanup()

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

// Push series
ctx := user.InjectOrgID(context.Background(), "test")

for _, series := range series {
req, _ := mockWriteRequest(series.lbls, series.value, series.timestamp)
_, err := i.v2Push(ctx, req)
require.NoError(t, err)
}

// Run tests
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
req := &client.QueryRequest{
StartTimestampMs: testData.from,
EndTimestampMs: testData.to,
Matchers: testData.matchers,
}

res, err := i.v2Query(ctx, req)
require.NoError(t, err)
assert.ElementsMatch(t, testData.expected, res.Timeseries)
})
}
}

func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) (*client.WriteRequest, *client.QueryResponse) {
samples := []client.Sample{
{
TimestampMs: timestampMs,
Value: value,
},
}

req := client.ToWriteRequest([]labels.Labels{lbls}, samples, client.API)

// Generate the expected response
expectedResponse := &client.QueryResponse{
Timeseries: []client.TimeSeries{
{
Labels: client.FromLabelsToLabelAdapters(lbls),
Samples: samples,
},
},
}

return req, expectedResponse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see expectedResponse being used anywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is a copy and paste from #1830 (still under review). Keeping the copy and paste will help me rebasing once one of the two PRs will get merged.

}

func newIngesterMockWithTSDBStorage(ingesterCfg Config, registerer prometheus.Registerer) (*Ingester, func(), error) {
clientCfg := defaultClientTestConfig()
limits := defaultLimitsTestConfig()
Expand Down