Skip to content

Commit

Permalink
Add support for range vector aggregators (#2601)
Browse files Browse the repository at this point in the history
* Add support for range vector aggregators

* Remove resets from list of supported vector aggregators

* Remove unused const value

* Remove irate from list of supported range vector aggregators

* Add comment to present_over_time
  • Loading branch information
ssncferreira authored Aug 1, 2022
1 parent c4ea443 commit e92aee1
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 31 deletions.
21 changes: 14 additions & 7 deletions pkg/frontend/querymiddleware/astmapper/instant_splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ var splittableVectorAggregators = map[parser.ItemType]bool{
// Supported range vector aggregators

const (
avgOverTime = "avg_over_time"
countOverTime = "count_over_time"
increase = "increase"
maxOverTime = "max_over_time"
minOverTime = "min_over_time"
rate = "rate"
sumOverTime = "sum_over_time"
avgOverTime = "avg_over_time"
countOverTime = "count_over_time"
increase = "increase"
maxOverTime = "max_over_time"
minOverTime = "min_over_time"
presentOverTime = "present_over_time"
rate = "rate"
sumOverTime = "sum_over_time"
)

// cannotDoubleCountBoundaries is the list of functions that cannot double count the boundaries points when being split by range.
Expand Down Expand Up @@ -194,10 +195,16 @@ func (i *instantSplitter) mapCall(expr *parser.Call) (mapped parser.Expr, finish
return i.mapCallAvgOverTime(expr)
case countOverTime:
return i.mapCallVectorAggregation(expr, parser.SUM)
case increase:
return i.mapCallVectorAggregation(expr, parser.SUM)
case maxOverTime:
return i.mapCallVectorAggregation(expr, parser.MAX)
case minOverTime:
return i.mapCallVectorAggregation(expr, parser.MIN)
case presentOverTime:
// present_over_time returns the value 1 for any series in the specified interval,
// therefore, using aggregator MAX enforces that all 1 values are returned.
return i.mapCallVectorAggregation(expr, parser.MAX)
case rate:
return i.mapCallRate(expr)
case sumOverTime:
Expand Down
150 changes: 147 additions & 3 deletions pkg/frontend/querymiddleware/astmapper/instant_splitting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestInstantSplitter(t *testing.T) {
out string
expectedSplitQueries int
}{
// Range vector aggregators
// Splittable range vector aggregators
{
in: `avg_over_time({app="foo"}[3m])`,
out: `(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `)) / (sum without() (` + concatOffsets(splitInterval, 3, false, `count_over_time({app="foo"}[x]y)`) + `))`,
Expand All @@ -34,6 +34,11 @@ func TestInstantSplitter(t *testing.T) {
out: `sum without() (` + concatOffsets(splitInterval, 3, false, `count_over_time({app="foo"}[x]y)`) + `)`,
expectedSplitQueries: 3,
},
{
in: `increase({app="foo"}[3m])`,
out: `sum without() (` + concatOffsets(splitInterval, 3, true, `increase({app="foo"}[x]y)`) + `)`,
expectedSplitQueries: 3,
},
{
in: `max_over_time({app="foo"}[3m])`,
out: `max without() (` + concatOffsets(splitInterval, 3, true, `max_over_time({app="foo"}[x]y)`) + `)`,
Expand All @@ -44,6 +49,11 @@ func TestInstantSplitter(t *testing.T) {
out: `min without() (` + concatOffsets(splitInterval, 3, true, `min_over_time({app="foo"}[x]y)`) + `)`,
expectedSplitQueries: 3,
},
{
in: `present_over_time({app="foo"}[3m])`,
out: `max without() (` + concatOffsets(splitInterval, 3, true, `present_over_time({app="foo"}[x]y)`) + `)`,
expectedSplitQueries: 3,
},
{
in: `rate({app="foo"}[3m])`,
out: `sum without() (` + concatOffsets(splitInterval, 3, true, `increase({app="foo"}[x]y)`) + `) / 180`,
Expand All @@ -54,6 +64,97 @@ func TestInstantSplitter(t *testing.T) {
out: `sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `)`,
expectedSplitQueries: 3,
},
// Splittable aggregations wrapped by non-aggregative functions.
{
in: `absent(sum_over_time({app="foo"}[3m]))`,
out: `absent(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `))`,
expectedSplitQueries: 3,
},
{
in: `ceil(sum_over_time({app="foo"}[3m]))`,
out: `ceil(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `))`,
expectedSplitQueries: 3,
},
{
in: `clamp(sum_over_time({app="foo"}[3m]), 1, 10)`,
out: `clamp(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `), 1, 10)`,
expectedSplitQueries: 3,
},
{
in: `clamp_max(sum_over_time({app="foo"}[3m]), 10)`,
out: `clamp_max(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `), 10)`,
expectedSplitQueries: 3,
},
{
in: `clamp_min(sum_over_time({app="foo"}[3m]), 1)`,
out: `clamp_min(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `), 1)`,
expectedSplitQueries: 3,
},
{
in: `exp(sum_over_time({app="foo"}[3m]))`,
out: `exp(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `))`,
expectedSplitQueries: 3,
},
{
in: `floor(sum_over_time({app="foo"}[3m]))`,
out: `floor(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `))`,
expectedSplitQueries: 3,
},
{
in: `histogram_quantile(0.9, sum_over_time({app="foo"}[3m]))`,
out: `histogram_quantile(0.9, sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `) )`,
expectedSplitQueries: 3,
},
{
in: `label_join(sum_over_time({app="foo"}[3m]), "foo", ",", "group_1", "group_2", "const")`,
out: `label_join(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `), "foo", ",", "group_1", "group_2", "const")`,
expectedSplitQueries: 3,
},
{
in: `label_replace(sum_over_time({app="foo"}[3m]), "foo", "bar$1", "group_2", "(.*)")`,
out: `label_replace(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `), "foo", "bar$1", "group_2", "(.*)")`,
expectedSplitQueries: 3,
},
{
in: `ln(sum_over_time({app="foo"}[3m]))`,
out: `ln(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `))`,
expectedSplitQueries: 3,
},
{
in: `log2(sum_over_time({app="foo"}[3m]))`,
out: `log2(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `))`,
expectedSplitQueries: 3,
},
{
in: `round(sum_over_time({app="foo"}[3m]))`,
out: `round(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `))`,
expectedSplitQueries: 3,
},
{
in: `scalar(sum_over_time({app="foo"}[3m]))`,
out: `scalar(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `))`,
expectedSplitQueries: 3,
},
{
in: `sgn(sum_over_time({app="foo"}[3m]))`,
out: `sgn(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `))`,
expectedSplitQueries: 3,
},
{
in: `sort(sum_over_time({app="foo"}[3m]))`,
out: `sort(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `))`,
expectedSplitQueries: 3,
},
{
in: `sort_desc(sum_over_time({app="foo"}[3m]))`,
out: `sort_desc(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `))`,
expectedSplitQueries: 3,
},
{
in: `sqrt(sum_over_time({app="foo"}[3m]))`,
out: `sqrt(sum without() (` + concatOffsets(splitInterval, 3, false, `sum_over_time({app="foo"}[x]y)`) + `))`,
expectedSplitQueries: 3,
},
// Vector aggregators
{
in: `avg(rate({app="foo"}[3m]))`,
Expand Down Expand Up @@ -344,12 +445,55 @@ func TestInstantSplitterNoOp(t *testing.T) {
for _, tt := range []struct {
query string
}{
// should be noop if expression is not splittable
// should be noop if range vector aggregator is not splittable
{
query: `absent_over_time({app="foo"}[3m])`,
},
{
query: `changes({app="foo"}[3m])`,
},
{
query: `delta({app="foo"}[3m])`,
},
{
query: `deriv({app="foo"}[3m])`,
},
{
query: `holt_winters({app="foo"}[3m], 1, 10)`,
},
{
query: `idelta({app="foo"}[3m])`,
},
{
query: `irate({app="foo"}[3m])`,
},
{
query: `last_over_time({app="foo"}[3m])`,
},
{
query: `predict_linear({app="foo"}[3m], 1)`,
},
{
query: `quantile_over_time(0.95, foo[3m])`,
},
{
query: `topk(10, histogram_quantile(0.9, irate({app="foo"}[3m])))`,
query: `resets(foo[3m])`,
},
{
query: `stddev_over_time(foo[3m])`,
},
{
query: `stdvar_over_time(foo[3m])`,
},
{
query: `time()`,
},
{
query: `vector(10)`,
},
// should be noop if expression is not splittable
{
query: `topk(10, histogram_quantile(0.9, delta({app="foo"}[3m])))`,
},
// should be noop if range interval is lower or equal to split interval (1m)
{
Expand Down
Loading

0 comments on commit e92aee1

Please sign in to comment.