Skip to content

Commit 11bc349

Browse files
committed
wiring for multiple sharding configs
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
1 parent fab4a60 commit 11bc349

File tree

2 files changed

+174
-9
lines changed

2 files changed

+174
-9
lines changed

pkg/querier/querysharding/middleware.go

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,42 +2,98 @@ package querysharding
22

33
import (
44
"context"
5+
"github.com/cortexproject/cortex/pkg/chunk"
56
"github.com/cortexproject/cortex/pkg/querier/astmapper"
67
"github.com/cortexproject/cortex/pkg/querier/queryrange"
8+
"github.com/pkg/errors"
79
"github.com/prometheus/prometheus/promql"
810
"time"
911
)
1012

1113
const (
1214
downStreamErrType = "downstream error"
13-
parseErrType = "parse error"
1415
)
1516

16-
func QueryShardMiddleware(engine *promql.Engine) queryrange.Middleware {
17+
var (
18+
invalidShardingRange = errors.New("Query does not fit in a single sharding configuration")
19+
)
20+
21+
// Sharding configuration. Will eventually support ranges like chunk.PeriodConfig for meshing togethe multiple queryShard(s).
22+
// This will enable compatibility for deployments which change their shard factors over time.
23+
type ShardingConfig struct {
24+
From chunk.DayTime `yaml:"from"`
25+
Shards int `yaml:"shards"`
26+
}
27+
28+
type ShardingConfigs []ShardingConfig
29+
30+
func (confs ShardingConfigs) ValidRange(start, end int64) (ShardingConfig, error) {
31+
for i, conf := range confs {
32+
if start < int64(conf.From.Time) {
33+
// the query starts before this config's range
34+
return ShardingConfig{}, invalidShardingRange
35+
} else if i == len(confs)-1 {
36+
// the last configuration has no upper bound
37+
return conf, nil
38+
} else if end < int64(confs[i+1].From.Time) {
39+
// The request is entirely scoped into this shard config
40+
return conf, nil
41+
} else {
42+
continue
43+
}
44+
}
45+
46+
return ShardingConfig{}, invalidShardingRange
47+
48+
}
49+
50+
func mapQuery(mapper astmapper.ASTMapper, query string) (promql.Node, error) {
51+
expr, err := promql.ParseExpr(query)
52+
if err != nil {
53+
return nil, err
54+
}
55+
return mapper.Map(expr)
56+
}
57+
58+
func QueryShardMiddleware(engine *promql.Engine, confs ShardingConfigs) queryrange.Middleware {
1759
return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
1860
return &queryShard{
61+
confs: confs,
1962
next: next,
2063
engine: engine,
21-
mapper: astmapper.NewMultiMapper(
22-
astmapper.NewShardSummer(astmapper.DEFAULT_SHARDS, astmapper.VectorSquasher),
23-
astmapper.MapperFunc(astmapper.ShallowEmbedSelectors),
24-
),
2564
}
2665
})
2766
}
2867

2968
type queryShard struct {
69+
confs ShardingConfigs
3070
next queryrange.Handler
3171
engine *promql.Engine
32-
mapper astmapper.ASTMapper
3372
}
3473

3574
func (qs *queryShard) Do(ctx context.Context, r *queryrange.Request) (*queryrange.APIResponse, error) {
3675
queryable := &DownstreamQueryable{r, qs.next}
3776

77+
conf, err := qs.confs.ValidRange(r.Start, r.End)
78+
if err != nil {
79+
return nil, err
80+
}
81+
82+
mappedQuery, err := mapQuery(
83+
astmapper.NewMultiMapper(
84+
astmapper.NewShardSummer(conf.Shards, astmapper.VectorSquasher),
85+
astmapper.MapperFunc(astmapper.ShallowEmbedSelectors),
86+
),
87+
r.Query,
88+
)
89+
90+
if err != nil {
91+
return nil, err
92+
}
93+
3894
qry, err := qs.engine.NewRangeQuery(
3995
queryable,
40-
r.Query,
96+
mappedQuery.String(),
4197
time.Unix(r.Start, 0), time.Unix(r.End, 0),
4298
time.Duration(r.Step)*time.Second,
4399
)

pkg/querier/querysharding/middleware_test.go

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package querysharding
22

33
import (
44
"context"
5+
"github.com/cortexproject/cortex/pkg/chunk"
56
"github.com/cortexproject/cortex/pkg/ingester/client"
67
"github.com/cortexproject/cortex/pkg/querier/queryrange"
78
"github.com/cortexproject/cortex/pkg/util"
89
"github.com/pkg/errors"
10+
"github.com/prometheus/common/model"
911
"github.com/prometheus/prometheus/promql"
1012
"github.com/stretchr/testify/require"
1113
"testing"
@@ -83,7 +85,14 @@ func TestMiddleware(t *testing.T) {
8385
Timeout: time.Minute,
8486
})
8587

86-
handler := QueryShardMiddleware(engine).Wrap(c.next)
88+
handler := QueryShardMiddleware(
89+
engine,
90+
ShardingConfigs{
91+
{
92+
Shards: 3,
93+
},
94+
},
95+
).Wrap(c.next)
8796

8897
// escape hatch for custom tests
8998
if c.override != nil {
@@ -168,3 +177,103 @@ func defaultReq() *queryrange.Request {
168177
Query: `__embedded_query__{__cortex_query__="687474705f72657175657374735f746f74616c7b636c75737465723d2270726f64227d"}`,
169178
}
170179
}
180+
181+
func TestShardingConfigs_ValidRange(t *testing.T) {
182+
reqWith := func(start, end string) *queryrange.Request {
183+
r := defaultReq()
184+
185+
if start != "" {
186+
r.Start = int64(parseDate(start))
187+
}
188+
189+
if end != "" {
190+
r.End = int64(parseDate(end))
191+
}
192+
193+
return r
194+
}
195+
196+
var testExpr = []struct {
197+
name string
198+
confs ShardingConfigs
199+
req *queryrange.Request
200+
expected ShardingConfig
201+
err error
202+
}{
203+
{
204+
name: "0 ln configs fail",
205+
confs: ShardingConfigs{},
206+
req: defaultReq(),
207+
err: invalidShardingRange,
208+
},
209+
{
210+
name: "request starts before beginning config",
211+
confs: ShardingConfigs{
212+
{
213+
From: chunk.DayTime{parseDate("2019-10-16")},
214+
Shards: 1,
215+
},
216+
},
217+
req: reqWith("2019-10-15", ""),
218+
err: invalidShardingRange,
219+
},
220+
{
221+
name: "request spans multiple configs",
222+
confs: ShardingConfigs{
223+
{
224+
From: chunk.DayTime{parseDate("2019-10-16")},
225+
Shards: 1,
226+
},
227+
{
228+
From: chunk.DayTime{parseDate("2019-11-16")},
229+
Shards: 2,
230+
},
231+
},
232+
req: reqWith("2019-10-15", "2019-11-17"),
233+
err: invalidShardingRange,
234+
},
235+
{
236+
name: "selects correct config ",
237+
confs: ShardingConfigs{
238+
{
239+
From: chunk.DayTime{parseDate("2019-10-16")},
240+
Shards: 1,
241+
},
242+
{
243+
From: chunk.DayTime{parseDate("2019-11-16")},
244+
Shards: 2,
245+
},
246+
{
247+
From: chunk.DayTime{parseDate("2019-12-16")},
248+
Shards: 3,
249+
},
250+
},
251+
req: reqWith("2019-11-20", "2019-11-25"),
252+
expected: ShardingConfig{
253+
From: chunk.DayTime{parseDate("2019-11-16")},
254+
Shards: 2,
255+
},
256+
},
257+
}
258+
259+
for _, c := range testExpr {
260+
t.Run(c.name, func(t *testing.T) {
261+
out, err := c.confs.ValidRange(c.req.Start, c.req.End)
262+
263+
if c.err != nil {
264+
require.EqualError(t, err, c.err.Error())
265+
} else {
266+
require.Nil(t, err)
267+
require.Equal(t, c.expected, out)
268+
}
269+
})
270+
}
271+
}
272+
273+
func parseDate(in string) model.Time {
274+
t, err := time.Parse("2006-01-02", in)
275+
if err != nil {
276+
panic(err)
277+
}
278+
return model.Time(t.UnixNano())
279+
}

0 commit comments

Comments
 (0)