Skip to content

Commit 73cc255

Browse files
authored
Manipulate ingesters query min time when -querier.query-ingesters-within is set (cortexproject#2904)
Signed-off-by: Marco Pracucci <marco@pracucci.com>
1 parent b4810c8 commit 73cc255

File tree

6 files changed

+153
-58
lines changed

6 files changed

+153
-58
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* [CHANGE] Ingester: Chunks flushed via /flush stay in memory until retention period is reached. This affects `cortex_ingester_memory_chunks` metric. #2778
1515
* [CHANGE] Querier: the error message returned when the query time range exceeds `-store.max-query-length` has changed from `invalid query, length > limit (X > Y)` to `the query time range exceeds the limit (query length: X, limit: Y)`. #2826
1616
* [CHANGE] Add `component` label to metrics exposed by chunk, delete and index store clients. #2774
17+
* [CHANGE] Querier: when `-querier.query-ingesters-within` is configured, the time range of the query sent to ingesters is now manipulated to ensure the query start time is not older than 'now - query-ingesters-within'. #2904
1718
* [CHANGE] KV: The `role` label which was a label of `multi` KV store client only has been added to metrics of every KV store client. If KV store client is not `multi`, then the value of `role` label is `primary`. #2837
1819
* [CHANGE] Added the `engine` label to the metrics exposed by the Prometheus query engine, to distinguish between `ruler` and `querier` metrics. #2854
1920
* [CHANGE] Added ruler to the single binary when started with `-target=all` (default). #2854

pkg/querier/blocks_store_queryable.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
334334
maxT = util.Min64(maxT, util.TimeToMillis(now.Add(-q.queryStoreAfter)))
335335

336336
if origMaxT != maxT {
337-
level.Debug(spanLog).Log("msg", "query max time has been manipulated", "original", origMaxT, "updated", maxT)
337+
level.Debug(spanLog).Log("msg", "the max time of the query to blocks storage has been manipulated", "original", origMaxT, "updated", maxT)
338338
}
339339

340340
if maxT < minT {

pkg/querier/distributor_queryable.go

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sort"
66
"time"
77

8+
"github.com/go-kit/kit/log/level"
89
"github.com/prometheus/common/model"
910
"github.com/prometheus/prometheus/pkg/labels"
1011
"github.com/prometheus/prometheus/promql"
@@ -31,44 +32,46 @@ type Distributor interface {
3132
MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
3233
}
3334

34-
func newDistributorQueryable(distributor Distributor, streaming bool, iteratorFn chunkIteratorFunc, queryIngesterWithin time.Duration) QueryableWithFilter {
35+
func newDistributorQueryable(distributor Distributor, streaming bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration) QueryableWithFilter {
3536
return distributorQueryable{
36-
distributor: distributor,
37-
streaming: streaming,
38-
iteratorFn: iteratorFn,
39-
queryIngesterWithin: queryIngesterWithin,
37+
distributor: distributor,
38+
streaming: streaming,
39+
iteratorFn: iteratorFn,
40+
queryIngestersWithin: queryIngestersWithin,
4041
}
4142
}
4243

4344
type distributorQueryable struct {
44-
distributor Distributor
45-
streaming bool
46-
iteratorFn chunkIteratorFunc
47-
queryIngesterWithin time.Duration
45+
distributor Distributor
46+
streaming bool
47+
iteratorFn chunkIteratorFunc
48+
queryIngestersWithin time.Duration
4849
}
4950

5051
func (d distributorQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
5152
return &distributorQuerier{
52-
distributor: d.distributor,
53-
ctx: ctx,
54-
mint: mint,
55-
maxt: maxt,
56-
streaming: d.streaming,
57-
chunkIterFn: d.iteratorFn,
53+
distributor: d.distributor,
54+
ctx: ctx,
55+
mint: mint,
56+
maxt: maxt,
57+
streaming: d.streaming,
58+
chunkIterFn: d.iteratorFn,
59+
queryIngestersWithin: d.queryIngestersWithin,
5860
}, nil
5961
}
6062

6163
func (d distributorQueryable) UseQueryable(now time.Time, _, queryMaxT int64) bool {
6264
// Include ingester only if maxt is within QueryIngestersWithin w.r.t. current time.
63-
return d.queryIngesterWithin == 0 || queryMaxT >= util.TimeToMillis(now.Add(-d.queryIngesterWithin))
65+
return d.queryIngestersWithin == 0 || queryMaxT >= util.TimeToMillis(now.Add(-d.queryIngestersWithin))
6466
}
6567

6668
type distributorQuerier struct {
67-
distributor Distributor
68-
ctx context.Context
69-
mint, maxt int64
70-
streaming bool
71-
chunkIterFn chunkIteratorFunc
69+
distributor Distributor
70+
ctx context.Context
71+
mint, maxt int64
72+
streaming bool
73+
chunkIterFn chunkIteratorFunc
74+
queryIngestersWithin time.Duration
7275
}
7376

7477
// Select implements storage.Querier interface.
@@ -77,23 +80,45 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
7780
log, ctx := spanlogger.New(q.ctx, "distributorQuerier.Select")
7881
defer log.Span.Finish()
7982

83+
minT, maxT := q.mint, q.maxt
84+
if sp != nil {
85+
minT, maxT = sp.Start, sp.End
86+
}
87+
88+
// If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until
89+
// now - queryIngestersWithin, because older time ranges are covered by the storage. This
90+
// optimization is particularly important for the blocks storage where the blocks retention in the
91+
// ingesters could be way higher than queryIngestersWithin.
92+
if q.queryIngestersWithin > 0 {
93+
now := time.Now()
94+
origMinT := minT
95+
minT = util.Max64(minT, util.TimeToMillis(now.Add(-q.queryIngestersWithin)))
96+
97+
if origMinT != minT {
98+
level.Debug(log).Log("msg", "the min time of the query to ingesters has been manipulated", "original", origMinT, "updated", minT)
99+
}
100+
101+
if minT > maxT {
102+
level.Debug(log).Log("msg", "empty query time range after min time manipulation")
103+
return storage.EmptySeriesSet()
104+
}
105+
}
106+
80107
// Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation,
81108
// which needs only metadata.
82109
if sp == nil {
83-
ms, err := q.distributor.MetricsForLabelMatchers(ctx, model.Time(q.mint), model.Time(q.maxt), matchers...)
110+
ms, err := q.distributor.MetricsForLabelMatchers(ctx, model.Time(minT), model.Time(maxT), matchers...)
84111
if err != nil {
85112
return storage.ErrSeriesSet(err)
86113
}
87114
return series.MetricsToSeriesSet(ms)
88115
}
89116

90-
mint, maxt := sp.Start, sp.End
91-
92117
if q.streaming {
93-
return q.streamingSelect(*sp, matchers)
118+
return q.streamingSelect(minT, maxT, matchers)
94119
}
95120

96-
matrix, err := q.distributor.Query(ctx, model.Time(mint), model.Time(maxt), matchers...)
121+
matrix, err := q.distributor.Query(ctx, model.Time(minT), model.Time(maxT), matchers...)
97122
if err != nil {
98123
return storage.ErrSeriesSet(promql.ErrStorage{Err: err})
99124
}
@@ -102,15 +127,13 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
102127
return series.MatrixToSeriesSet(matrix)
103128
}
104129

105-
func (q *distributorQuerier) streamingSelect(sp storage.SelectHints, matchers []*labels.Matcher) storage.SeriesSet {
130+
func (q *distributorQuerier) streamingSelect(minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet {
106131
userID, err := user.ExtractOrgID(q.ctx)
107132
if err != nil {
108133
return storage.ErrSeriesSet(promql.ErrStorage{Err: err})
109134
}
110135

111-
mint, maxt := sp.Start, sp.End
112-
113-
results, err := q.distributor.QueryStream(q.ctx, model.Time(mint), model.Time(maxt), matchers...)
136+
results, err := q.distributor.QueryStream(q.ctx, model.Time(minT), model.Time(maxT), matchers...)
114137
if err != nil {
115138
return storage.ErrSeriesSet(promql.ErrStorage{Err: err})
116139
}

pkg/querier/distributor_queryable_test.go

Lines changed: 87 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package querier
22

33
import (
44
"context"
5+
"fmt"
56
"testing"
67
"time"
78

89
"github.com/prometheus/common/model"
910
"github.com/prometheus/prometheus/pkg/labels"
1011
"github.com/prometheus/prometheus/scrape"
1112
"github.com/prometheus/prometheus/storage"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/mock"
1215
"github.com/stretchr/testify/require"
1316
"github.com/weaveworks/common/user"
1417

@@ -25,8 +28,9 @@ const (
2528
)
2629

2730
func TestDistributorQuerier(t *testing.T) {
28-
d := &mockDistributor{
29-
m: model.Matrix{
31+
d := &mockDistributor{}
32+
d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
33+
model.Matrix{
3034
// Matrixes are unsorted, so this tests that the labels get sorted.
3135
&model.SampleStream{
3236
Metric: model.Metric{
@@ -39,7 +43,8 @@ func TestDistributorQuerier(t *testing.T) {
3943
},
4044
},
4145
},
42-
}
46+
nil)
47+
4348
queryable := newDistributorQueryable(d, false, nil, 0)
4449
querier, err := queryable.Querier(context.Background(), mint, maxt)
4550
require.NoError(t, err)
@@ -59,6 +64,73 @@ func TestDistributorQuerier(t *testing.T) {
5964
require.NoError(t, seriesSet.Err())
6065
}
6166

67+
func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) {
68+
now := time.Now()
69+
70+
tests := map[string]struct {
71+
queryIngestersWithin time.Duration
72+
queryMinT int64
73+
queryMaxT int64
74+
expectedMinT int64
75+
expectedMaxT int64
76+
}{
77+
"should not manipulate query time range if queryIngestersWithin is disabled": {
78+
queryIngestersWithin: 0,
79+
queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
80+
queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
81+
expectedMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
82+
expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
83+
},
84+
"should not manipulate query time range if queryIngestersWithin is enabled but query min time is newer": {
85+
queryIngestersWithin: time.Hour,
86+
queryMinT: util.TimeToMillis(now.Add(-50 * time.Minute)),
87+
queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
88+
expectedMinT: util.TimeToMillis(now.Add(-50 * time.Minute)),
89+
expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
90+
},
91+
"should manipulate query time range if queryIngestersWithin is enabled and query min time is older": {
92+
queryIngestersWithin: time.Hour,
93+
queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
94+
queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
95+
expectedMinT: util.TimeToMillis(now.Add(-60 * time.Minute)),
96+
expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
97+
},
98+
"should skip the query if the query max time is older than queryIngestersWithin": {
99+
queryIngestersWithin: time.Hour,
100+
queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
101+
queryMaxT: util.TimeToMillis(now.Add(-90 * time.Minute)),
102+
expectedMinT: 0,
103+
expectedMaxT: 0,
104+
},
105+
}
106+
107+
for _, streamingEnabled := range []bool{false, true} {
108+
for testName, testData := range tests {
109+
t.Run(fmt.Sprintf("%s (streaming enabled: %t)", testName, streamingEnabled), func(t *testing.T) {
110+
distributor := &mockDistributor{}
111+
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil)
112+
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil)
113+
114+
ctx := user.InjectOrgID(context.Background(), "test")
115+
queryable := newDistributorQueryable(distributor, streamingEnabled, nil, testData.queryIngestersWithin)
116+
querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT)
117+
require.NoError(t, err)
118+
119+
seriesSet := querier.Select(true, &storage.SelectHints{Start: testData.queryMinT, End: testData.queryMaxT})
120+
require.NoError(t, seriesSet.Err())
121+
122+
if testData.expectedMinT == 0 && testData.expectedMaxT == 0 {
123+
assert.Len(t, distributor.Calls, 0)
124+
} else {
125+
require.Len(t, distributor.Calls, 1)
126+
assert.InDelta(t, testData.expectedMinT, int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), float64(5*time.Second.Milliseconds()))
127+
assert.Equal(t, testData.expectedMaxT, int64(distributor.Calls[0].Arguments.Get(2).(model.Time)))
128+
}
129+
})
130+
}
131+
}
132+
}
133+
62134
func TestDistributorQueryableFilter(t *testing.T) {
63135
d := &mockDistributor{}
64136
dq := newDistributorQueryable(d, false, nil, 1*time.Hour)
@@ -86,8 +158,9 @@ func TestIngesterStreaming(t *testing.T) {
86158
})
87159
require.NoError(t, err)
88160

89-
d := &mockDistributor{
90-
r: &client.QueryStreamResponse{
161+
d := &mockDistributor{}
162+
d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
163+
&client.QueryStreamResponse{
91164
Chunkseries: []client.TimeSeriesChunk{
92165
{
93166
Labels: []client.LabelAdapter{
@@ -103,7 +176,8 @@ func TestIngesterStreaming(t *testing.T) {
103176
},
104177
},
105178
},
106-
}
179+
nil)
180+
107181
ctx := user.InjectOrgID(context.Background(), "0")
108182
queryable := newDistributorQueryable(d, true, mergeChunks, 0)
109183
querier, err := queryable.Querier(ctx, mint, maxt)
@@ -125,17 +199,16 @@ func TestIngesterStreaming(t *testing.T) {
125199
}
126200

127201
type mockDistributor struct {
128-
metadata []scrape.MetricMetadata
129-
metadataError error
130-
m model.Matrix
131-
r *client.QueryStreamResponse
202+
mock.Mock
132203
}
133204

134205
func (m *mockDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
135-
return m.m, nil
206+
args := m.Called(ctx, from, to, matchers)
207+
return args.Get(0).(model.Matrix), args.Error(1)
136208
}
137209
func (m *mockDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) {
138-
return m.r, nil
210+
args := m.Called(ctx, from, to, matchers)
211+
return args.Get(0).(*client.QueryStreamResponse), args.Error(1)
139212
}
140213
func (m *mockDistributor) LabelValuesForLabelName(context.Context, model.LabelName) ([]string, error) {
141214
return nil, nil
@@ -148,9 +221,6 @@ func (m *mockDistributor) MetricsForLabelMatchers(ctx context.Context, from, thr
148221
}
149222

150223
func (m *mockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
151-
if m.metadataError != nil {
152-
return nil, m.metadataError
153-
}
154-
155-
return m.metadata, nil
224+
args := m.Called(ctx)
225+
return args.Get(0).([]scrape.MetricMetadata), args.Error(1)
156226
}

pkg/querier/metadata_handler_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,17 @@ import (
88
"testing"
99

1010
"github.com/prometheus/prometheus/scrape"
11+
"github.com/stretchr/testify/mock"
1112
"github.com/stretchr/testify/require"
1213
)
1314

1415
func TestMetadataHandler_Success(t *testing.T) {
15-
d := &mockDistributor{
16-
metadata: []scrape.MetricMetadata{
16+
d := &mockDistributor{}
17+
d.On("MetricsMetadata", mock.Anything).Return(
18+
[]scrape.MetricMetadata{
1719
{Metric: "alertmanager_dispatcher_aggregation_groups", Help: "Number of active aggregation groups", Type: "gauge", Unit: ""},
1820
},
19-
}
21+
nil)
2022

2123
handler := MetadataHandler(d)
2224

@@ -49,9 +51,8 @@ func TestMetadataHandler_Success(t *testing.T) {
4951
}
5052

5153
func TestMetadataHandler_Error(t *testing.T) {
52-
d := &mockDistributor{
53-
metadataError: fmt.Errorf("no user id"),
54-
}
54+
d := &mockDistributor{}
55+
d.On("MetricsMetadata", mock.Anything).Return([]scrape.MetricMetadata{}, fmt.Errorf("no user id"))
5556

5657
handler := MetadataHandler(d)
5758

pkg/querier/querier_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/pkg/errors"
1414
"github.com/prometheus/prometheus/tsdb"
1515
"github.com/prometheus/prometheus/tsdb/chunkenc"
16+
"github.com/stretchr/testify/mock"
1617

1718
"github.com/cortexproject/cortex/pkg/chunk/purger"
1819
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -486,10 +487,9 @@ func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *moc
486487
matrix, err := chunk.ChunksToMatrix(context.Background(), cs.chunks, 0, through)
487488
require.NoError(t, err)
488489

489-
result := &mockDistributor{
490-
m: matrix,
491-
r: &client.QueryStreamResponse{Chunkseries: []client.TimeSeriesChunk{tsc}},
492-
}
490+
result := &mockDistributor{}
491+
result.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(matrix, nil)
492+
result.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{Chunkseries: []client.TimeSeriesChunk{tsc}}, nil)
493493
return result
494494
}
495495

0 commit comments

Comments
 (0)