Skip to content

Commit

Permalink
Alerting: optimize rules with multiple loki range queries (grafana#73103
Browse files Browse the repository at this point in the history
)
  • Loading branch information
JohnnyQQQQ authored Aug 9, 2023
1 parent 72da44d commit 2266e09
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 108 deletions.
12 changes: 7 additions & 5 deletions pkg/services/ngalert/store/alert_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,13 @@ func (st DBstore) GetAlertRulesForScheduling(ctx context.Context, query *ngmodel
// In previous versions of Grafana, Loki datasources would default to range queries
// instead of instant queries, sometimes creating unnecessary load. This is only
// done for Grafana Cloud.
if lokiRangeToInstantEnabled && canBeInstant(rule) {
if err := migrateToInstant(rule); err != nil {
st.Logger.Error("Could not migrate rule from range to instant query", "rule", rule.UID, "err", err)
} else {
st.Logger.Info("Migrated rule from range to instant query", "rule", rule.UID)
if lokiRangeToInstantEnabled {
if indices, migratable := canBeInstant(rule); migratable {
if err := migrateToInstant(rule, indices); err != nil {
st.Logger.Error("Could not migrate rule from range to instant query", "rule", rule.UID, "err", err)
} else {
st.Logger.Info("Migrated rule from range to instant query", "rule", rule.UID, "migrated_queries", len(indices))
}
}
}
rules = append(rules, rule)
Expand Down
105 changes: 61 additions & 44 deletions pkg/services/ngalert/store/loki_range_to_instant.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,57 +19,74 @@ func (t dsType) isLoki() bool {
return t.DS.Type == datasources.DS_LOKI
}

func canBeInstant(r *models.AlertRule) bool {
// canBeInstant checks if any of the query nodes that are loki range queries can be migrated to instant queries.
// If any are migratable, those indices are returned.
func canBeInstant(r *models.AlertRule) ([]int, bool) {
if len(r.Data) < 2 {
return false
}
// First query part should be range query.
if r.Data[0].QueryType != "range" {
return false
}

var t dsType
// We can ignore the error here, the query just won't be optimized.
_ = json.Unmarshal(r.Data[0].Model, &t)

if !t.isLoki() {
return false
}

exprRaw := make(map[string]interface{})
if err := json.Unmarshal(r.Data[1].Model, &exprRaw); err != nil {
return false
}

// Second query part should be and expression.
if !expr.IsDataSource(r.Data[1].DatasourceUID) {
return false
return nil, false
}
var (
optimizableIndices []int
canBeOptimized = false
)
// Loop over query nodes to find all Loki range queries.
for i := range r.Data {
if r.Data[i].QueryType != "range" {
continue
}
var t dsType
// We can ignore the error here, the query just won't be optimized.
_ = json.Unmarshal(r.Data[i].Model, &t)

// Second query part should be "last()"
if val, ok := exprRaw["reducer"].(string); !ok || val != "last" {
return false
if !t.isLoki() {
continue
}
var validReducers bool
// Loop over all query nodes to find the reduce node.
for ii := range r.Data {
// Second query part should be and expression.
if !expr.IsDataSource(r.Data[ii].DatasourceUID) {
continue
}
exprRaw := make(map[string]interface{})
if err := json.Unmarshal(r.Data[ii].Model, &exprRaw); err != nil {
continue
}
// Second query part should use first query part as expression.
if ref, ok := exprRaw["expression"].(string); !ok || ref != r.Data[i].RefID {
continue
}
// Second query part should be "last()"
if val, ok := exprRaw["reducer"].(string); !ok || val != "last" {
validReducers = false
break
}
validReducers = true
}
// If we found a reduce node that uses last, we can add the loki query to the optimizations.
if validReducers {
canBeOptimized = true
optimizableIndices = append(optimizableIndices, i)
}
}
// Second query part should use first query part as expression.
if ref, ok := exprRaw["expression"].(string); !ok || ref != r.Data[0].RefID {
return false
}
return true
return optimizableIndices, canBeOptimized
}

// migrateToInstant will move a range-query to an instant query. This should only
// migrateToInstant will move the provided indices from a range-query to an instant query. This should only
// be used for loki.
func migrateToInstant(r *models.AlertRule) error {
modelRaw := make(map[string]interface{})
if err := json.Unmarshal(r.Data[0].Model, &modelRaw); err != nil {
return err
}
modelRaw["queryType"] = "instant"
model, err := json.Marshal(modelRaw)
if err != nil {
return err
func migrateToInstant(r *models.AlertRule, optimizableIndices []int) error {
for _, lokiQueryIndex := range optimizableIndices {
modelRaw := make(map[string]interface{})
if err := json.Unmarshal(r.Data[lokiQueryIndex].Model, &modelRaw); err != nil {
return err
}
modelRaw["queryType"] = "instant"
model, err := json.Marshal(modelRaw)
if err != nil {
return err
}
r.Data[lokiQueryIndex].Model = model
r.Data[lokiQueryIndex].QueryType = "instant"
}
r.Data[0].Model = model
r.Data[0].QueryType = "instant"
return nil
}
177 changes: 118 additions & 59 deletions pkg/services/ngalert/store/loki_range_to_instant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package store

import (
"encoding/json"
"fmt"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -11,22 +12,31 @@ import (

func TestCanBeInstant(t *testing.T) {
tcs := []struct {
name string
expected bool
rule *models.AlertRule
name string
expected bool
expectedIndices []int
rule *models.AlertRule
}{
{
name: "valid rule that can be migrated from range to instant",
expected: true,
rule: createMigrateableLokiRule(t),
name: "valid rule that can be migrated from range to instant",
expected: true,
expectedIndices: []int{0},
rule: createMigrateableLokiRule(t),
},
{
name: "valid rule with external loki datasource",
expected: true,
name: "valid rule with external loki datasource",
expected: true,
expectedIndices: []int{0},
rule: createMigrateableLokiRule(t, func(r *models.AlertRule) {
r.Data[0].DatasourceUID = "something-external"
}),
},
{
name: "valid multi query rule with loki datasources",
expected: true,
expectedIndices: []int{0, 1},
rule: createMultiQueryMigratableLokiRule(t),
},
{
name: "invalid rule where the data array is too short to be migrateable",
expected: false,
Expand All @@ -45,16 +55,18 @@ func TestCanBeInstant(t *testing.T) {
name: "invalid rule that has not last() as aggregation",
expected: false,
rule: createMigrateableLokiRule(t, func(r *models.AlertRule) {
raw := make(map[string]interface{})
err := json.Unmarshal(r.Data[1].Model, &raw)
require.NoError(t, err)
raw["reducer"] = "avg"
r.Data[1].Model, err = json.Marshal(raw)
require.NoError(t, err)
r.Data[1] = reducer(t, "B", "A", "avg")
}),
},
{
name: "invalid rule that has no aggregation as second item",
name: "invalid rule that has not all reducers last()",
expected: false,
rule: createMigrateableLokiRule(t, func(r *models.AlertRule) {
r.Data = append(r.Data, reducer(t, "invalid-reducer", "A", "min"))
}),
},
{
name: "invalid rule that has no aggregation",
expected: false,
rule: createMigrateableLokiRule(t, func(r *models.AlertRule) {
r.Data[1].DatasourceUID = "something-else"
Expand All @@ -75,32 +87,22 @@ func TestCanBeInstant(t *testing.T) {
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expected, canBeInstant(tc.rule))
indicies, canBe := canBeInstant(tc.rule)
require.Equal(t, tc.expected, canBe)
require.Equal(t, tc.expectedIndices, indicies)
})
}
}

func TestMigrateLokiQueryToInstant(t *testing.T) {
original := createMigrateableLokiRule(t)
mirgrated := createMigrateableLokiRule(t, func(r *models.AlertRule) {
r.Data[0].QueryType = "instant"
r.Data[0].Model = []byte(`{
"datasource": {
"type": "loki",
"uid": "grafanacloud-logs"
},
"editorMode": "code",
"expr": "1",
"hide": false,
"intervalMs": 1000,
"maxDataPoints": 43200,
"queryType": "instant",
"refId": "A"
}`)
r.Data[0] = lokiQuery(t, "A", "instant", "grafanacloud-logs")
})

require.True(t, canBeInstant(original))
require.NoError(t, migrateToInstant(original))
optimizableIndices, canBeOptimized := canBeInstant(original)
require.True(t, canBeOptimized)
require.NoError(t, migrateToInstant(original, optimizableIndices))

require.Equal(t, mirgrated.Data[0].QueryType, original.Data[0].QueryType)

Expand All @@ -111,35 +113,98 @@ func TestMigrateLokiQueryToInstant(t *testing.T) {

require.Equal(t, migratedModel, originalModel)

require.False(t, canBeInstant(original))
_, canBeOptimized = canBeInstant(original)
require.False(t, canBeOptimized)
}

func TestMigrateMultiLokiQueryToInstant(t *testing.T) {
original := createMultiQueryMigratableLokiRule(t)
mirgrated := createMultiQueryMigratableLokiRule(t, func(r *models.AlertRule) {
r.Data[0] = lokiQuery(t, "TotalRequests", "instant", "grafanacloud-logs")
r.Data[1] = lokiQuery(t, "TotalErrors", "instant", "grafanacloud-logs")
})

optimizableIndices, canBeOptimized := canBeInstant(original)
require.True(t, canBeOptimized)
require.NoError(t, migrateToInstant(original, optimizableIndices))

require.Equal(t, mirgrated.Data[0].QueryType, original.Data[0].QueryType)
require.Equal(t, mirgrated.Data[1].QueryType, original.Data[1].QueryType)

originalModel := make(map[string]interface{})
require.NoError(t, json.Unmarshal(original.Data[0].Model, &originalModel))
migratedModel := make(map[string]interface{})
require.NoError(t, json.Unmarshal(mirgrated.Data[0].Model, &migratedModel))

require.Equal(t, migratedModel, originalModel)

originalModel = make(map[string]interface{})
require.NoError(t, json.Unmarshal(original.Data[1].Model, &originalModel))
migratedModel = make(map[string]interface{})
require.NoError(t, json.Unmarshal(mirgrated.Data[1].Model, &migratedModel))

require.Equal(t, migratedModel, originalModel)

_, canBeOptimized = canBeInstant(original)
require.False(t, canBeOptimized)
}

func createMigrateableLokiRule(t *testing.T, muts ...func(*models.AlertRule)) *models.AlertRule {
t.Helper()
r := &models.AlertRule{
Data: []models.AlertQuery{
{
RefID: "A",
QueryType: "range",
DatasourceUID: "grafanacloud-logs",
Model: []byte(`{
lokiQuery(t, "A", "range", "grafanacloud-logs"),
reducer(t, "B", "A", "last"),
},
}
for _, m := range muts {
m(r)
}
return r
}

func createMultiQueryMigratableLokiRule(t *testing.T, muts ...func(*models.AlertRule)) *models.AlertRule {
t.Helper()
r := &models.AlertRule{
Data: []models.AlertQuery{
lokiQuery(t, "TotalRequests", "range", "grafanacloud-logs"),
lokiQuery(t, "TotalErrors", "range", "grafanacloud-logs"),
reducer(t, "TotalRequests_Last", "TotalRequests", "last"),
reducer(t, "TotalErrors_Last", "TotalErrors", "last"),
},
}
for _, m := range muts {
m(r)
}
return r
}
func lokiQuery(t *testing.T, refID, queryType, datasourceUID string) models.AlertQuery {
t.Helper()
return models.AlertQuery{
RefID: refID,
QueryType: queryType,
DatasourceUID: datasourceUID,
Model: []byte(fmt.Sprintf(`{
"datasource": {
"type": "loki",
"uid": "grafanacloud-logs"
"uid": "%s"
},
"editorMode": "code",
"expr": "1",
"hide": false,
"intervalMs": 1000,
"maxDataPoints": 43200,
"queryType": "range",
"refId": "A"
}`),
},
{
RefID: "B",
DatasourceUID: "__expr__",
Model: []byte(`{
"queryType": "%s",
"refId": "%s"
}`, datasourceUID, queryType, refID)),
}
}

func reducer(t *testing.T, refID, exp, op string) models.AlertQuery {
t.Helper()
return models.AlertQuery{
RefID: refID,
DatasourceUID: "__expr__",
Model: []byte(fmt.Sprintf(`{
"conditions": [
{
"evaluator": {
Expand All @@ -156,7 +221,7 @@ func createMigrateableLokiRule(t *testing.T, muts ...func(*models.AlertRule)) *m
},
"reducer": {
"params": [],
"type": "last"
"type": "%s"
},
"type": "query"
}
Expand All @@ -165,19 +230,13 @@ func createMigrateableLokiRule(t *testing.T, muts ...func(*models.AlertRule)) *m
"type": "__expr__",
"uid": "__expr__"
},
"expression": "A",
"expression": "%s",
"hide": false,
"intervalMs": 1000,
"maxDataPoints": 43200,
"reducer": "last",
"refId": "B",
"reducer": "%s",
"refId": "%s",
"type": "reduce"
}`),
},
},
}`, op, exp, op, refID)),
}
for _, m := range muts {
m(r)
}
return r
}

0 comments on commit 2266e09

Please sign in to comment.