Skip to content

Commit

Permalink
Alerting: Send alerts from state tracker to notifier, logging, and cl…
Browse files Browse the repository at this point in the history
…eanup task (grafana#32333)

* Initial commit for state tracking

* basic state transition logic and tests

* constructor. test and interface fixup

* use new sig for sch.definitionRoutine()

* test fixup

* make the linter happy

* more minor linting cleanup

* Alerting: Send alerts from state tracker to notifier

* Add evaluation time and test

Add evaluation time and test

* Add cleanup routine and logging

* Pull in compact.go and reconcile differences

* pr feedback

* pr feedback

Pull in compact.go and reconcile differences

Co-authored-by: Josue Abreu <josue@grafana.com>
  • Loading branch information
davidmparrott and gotjosh authored Mar 30, 2021
1 parent b9800a2 commit b1cb74c
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 96 deletions.
14 changes: 5 additions & 9 deletions pkg/services/ngalert/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,9 @@ type Results []Result
// Result contains the evaluated State of an alert instance
// identified by its labels.
type Result struct {
Instance data.Labels
State State // Enum
// StartAt is the time at which we first saw this state
StartAt time.Time
// FiredAt is the time at which we first transitioned to a firing state
FiredAt time.Time
Instance data.Labels
State State // Enum
EvaluatedAt time.Time
}

// State is an enum of the evaluation State for an alert instance.
Expand Down Expand Up @@ -211,8 +208,8 @@ func evaluateExecutionResult(results *ExecutionResults, ts time.Time) (Results,
}

r := Result{
Instance: f.Fields[0].Labels,
StartAt: ts,
Instance: f.Fields[0].Labels,
EvaluatedAt: ts,
}

switch {
Expand All @@ -223,7 +220,6 @@ func evaluateExecutionResult(results *ExecutionResults, ts time.Time) (Results,
case *val == 0:
r.State = Normal
default:
r.FiredAt = ts
r.State = Alerting
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/services/ngalert/ngalert.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func init() {
// Init initializes the AlertingService.
func (ng *AlertNG) Init() error {
ng.Log = log.New("ngalert")
ng.stateTracker = state.NewStateTracker()
ng.stateTracker = state.NewStateTracker(ng.Log)
baseInterval := baseIntervalSeconds * time.Second

store := store.DBstore{BaseInterval: baseInterval, DefaultIntervalSeconds: defaultIntervalSeconds, SQLStore: ng.SQLStore}
Expand All @@ -69,6 +69,7 @@ func (ng *AlertNG) Init() error {
MaxAttempts: maxAttempts,
Evaluator: eval.Evaluator{Cfg: ng.Cfg},
Store: store,
Notifier: ng.Alertmanager,
}
ng.schedule = schedule.NewScheduler(schedCfg, ng.DataService)

Expand Down
4 changes: 2 additions & 2 deletions pkg/services/ngalert/notifier/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ func (am *Alertmanager) buildReceiverIntegrations(receiver *api.PostableApiRecei
return integrations, nil
}

// CreateAlerts receives the alerts and then sends them through the corresponding route based on whenever the alert has a receiver embedded or not
func (am *Alertmanager) CreateAlerts(alerts ...*PostableAlert) error {
// PutAlerts receives the alerts and then sends them through the corresponding route based on whenever the alert has a receiver embedded or not
func (am *Alertmanager) PutAlerts(alerts ...*PostableAlert) error {
return am.alerts.PutPostableAlert(alerts...)
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/services/ngalert/schedule/compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package schedule

import (
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/prometheus/alertmanager/api/v2/models"
)

func FromAlertStateToPostableAlerts(firingStates []state.AlertState) []*notifier.PostableAlert {
alerts := make([]*notifier.PostableAlert, 0, len(firingStates))
for _, state := range firingStates {
alerts = append(alerts, &notifier.PostableAlert{
PostableAlert: models.PostableAlert{
Annotations: models.LabelSet{}, //TODO: add annotations to evaluation results, add them to the state struct, and then set them before sending to the notifier
StartsAt: state.StartsAt,
EndsAt: state.EndsAt,
Alert: models.Alert{
Labels: models.LabelSet(state.Labels),
},
},
})
}
return alerts
}
29 changes: 24 additions & 5 deletions pkg/services/ngalert/schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import (
"sync"
"time"

"github.com/grafana/grafana/pkg/services/ngalert/state"
"golang.org/x/sync/errgroup"

"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/benbjohnson/clock"

"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"

"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/tsdb"
"golang.org/x/sync/errgroup"
)

// timeNow makes it possible to test usage of time
Expand Down Expand Up @@ -85,7 +86,12 @@ func (sch *schedule) definitionRoutine(grafanaCtx context.Context, key models.Al
sch.log.Error("failed saving alert instance", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "instance", r.Instance, "state", r.State.String(), "error", err)
}
}
_ = stateTracker.ProcessEvalResults(key.DefinitionUID, results, condition)
transitionedStates := stateTracker.ProcessEvalResults(key.DefinitionUID, results, condition)
alerts := FromAlertStateToPostableAlerts(transitionedStates)
err = sch.SendAlerts(alerts)
if err != nil {
sch.log.Error("failed to put alerts in the notifier", "count", len(alerts), "err", err)
}
return nil
}

Expand Down Expand Up @@ -114,6 +120,11 @@ func (sch *schedule) definitionRoutine(grafanaCtx context.Context, key models.Al
}
}

// Notifier handles the delivery of alert notifications to the end user
type Notifier interface {
PutAlerts(alerts ...*notifier.PostableAlert) error
}

type schedule struct {
// base tick rate (fastest possible configured check)
baseInterval time.Duration
Expand Down Expand Up @@ -144,6 +155,8 @@ type schedule struct {
store store.Store

dataService *tsdb.Service

notifier Notifier
}

// SchedulerCfg is the scheduler configuration.
Expand All @@ -156,6 +169,7 @@ type SchedulerCfg struct {
StopAppliedFunc func(models.AlertDefinitionKey)
Evaluator eval.Evaluator
Store store.Store
Notifier Notifier
}

// NewScheduler returns a new schedule.
Expand All @@ -173,6 +187,7 @@ func NewScheduler(cfg SchedulerCfg, dataService *tsdb.Service) *schedule {
evaluator: cfg.Evaluator,
store: cfg.Store,
dataService: dataService,
notifier: cfg.Notifier,
}
return &sch
}
Expand Down Expand Up @@ -302,6 +317,10 @@ func (sch *schedule) Ticker(grafanaCtx context.Context, stateTracker *state.Stat
}
}

func (sch *schedule) SendAlerts(alerts []*notifier.PostableAlert) error {
return sch.notifier.PutAlerts(alerts...)
}

type alertDefinitionRegistry struct {
mu sync.Mutex
alertDefinitionInfo map[models.AlertDefinitionKey]alertDefinitionInfo
Expand Down
153 changes: 109 additions & 44 deletions pkg/services/ngalert/state/state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,25 @@ package state
import (
"fmt"
"sync"
"time"

"github.com/grafana/grafana/pkg/infra/log"

"github.com/go-openapi/strfmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/models"
ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
)

type AlertState struct {
UID string
CacheId string
Labels data.Labels
State eval.State
Results []eval.State
UID string
CacheId string
Labels data.Labels
State eval.State
Results []eval.State
StartsAt strfmt.DateTime
EndsAt strfmt.DateTime
EvaluatedAt strfmt.DateTime
}

type cache struct {
Expand All @@ -24,77 +31,135 @@ type cache struct {

type StateTracker struct {
stateCache cache
quit chan struct{}
Log log.Logger
}

func NewStateTracker() *StateTracker {
return &StateTracker{
func NewStateTracker(logger log.Logger) *StateTracker {
tracker := &StateTracker{
stateCache: cache{
cacheMap: make(map[string]AlertState),
mu: sync.Mutex{},
},
quit: make(chan struct{}),
Log: logger,
}
go tracker.cleanUp()
return tracker
}

func (c *cache) getOrCreate(uid string, result eval.Result) AlertState {
c.mu.Lock()
defer c.mu.Unlock()
func (st *StateTracker) getOrCreate(uid string, result eval.Result) AlertState {
st.stateCache.mu.Lock()
defer st.stateCache.mu.Unlock()

idString := fmt.Sprintf("%s %s", uid, result.Instance.String())
if state, ok := c.cacheMap[idString]; ok {
if state, ok := st.stateCache.cacheMap[idString]; ok {
return state
}
st.Log.Debug("adding new alert state cache entry", "cacheId", idString, "state", result.State.String(), "evaluatedAt", result.EvaluatedAt.String())
newState := AlertState{
UID: uid,
CacheId: idString,
Labels: result.Instance,
State: result.State,
Results: []eval.State{result.State},
UID: uid,
CacheId: idString,
Labels: result.Instance,
State: result.State,
Results: []eval.State{},
EvaluatedAt: strfmt.DateTime(result.EvaluatedAt),
}
c.cacheMap[idString] = newState
st.stateCache.cacheMap[idString] = newState
return newState
}

func (c *cache) update(stateEntry AlertState) {
c.mu.Lock()
defer c.mu.Unlock()
c.cacheMap[stateEntry.CacheId] = stateEntry
func (st *StateTracker) set(stateEntry AlertState) {
st.stateCache.mu.Lock()
defer st.stateCache.mu.Unlock()
st.stateCache.cacheMap[stateEntry.CacheId] = stateEntry
}

func (c *cache) getStateForEntry(stateId string) eval.State {
c.mu.Lock()
defer c.mu.Unlock()
return c.cacheMap[stateId].State
func (st *StateTracker) get(stateId string) AlertState {
st.stateCache.mu.Lock()
defer st.stateCache.mu.Unlock()
return st.stateCache.cacheMap[stateId]
}

func (st *StateTracker) ProcessEvalResults(uid string, results eval.Results, condition models.Condition) []AlertState {
func (st *StateTracker) ProcessEvalResults(uid string, results eval.Results, condition ngModels.Condition) []AlertState {
st.Log.Info("state tracker processing evaluation results", "uid", uid, "resultCount", len(results))
var changedStates []AlertState
for _, result := range results {
currentState := st.stateCache.getOrCreate(uid, result)
currentState.Results = append(currentState.Results, result.State)
newState := st.getNextState(uid, result)
if newState != currentState.State {
currentState.State = newState
changedStates = append(changedStates, currentState)
if s, ok := st.setNextState(uid, result); ok {
changedStates = append(changedStates, s)
}
st.stateCache.update(currentState)
}
st.Log.Debug("returning changed states to scheduler", "count", len(changedStates))
return changedStates
}

func (st *StateTracker) getNextState(uid string, result eval.Result) eval.State {
currentState := st.stateCache.getOrCreate(uid, result)
if currentState.State == result.State {
return currentState.State
}

//TODO: When calculating if an alert should not be firing anymore, we should take three things into account:
// 1. The re-send the delay if any, we don't want to send every firing alert every time, we should have a fixed delay across all alerts to avoid saturating the notification system
// 2. The evaluation interval defined for this particular alert - we don't support that yet but will eventually allow you to define how often do you want this alert to be evaluted
// 3. The base interval defined by the scheduler - in the case where #2 is not yet an option we can use the base interval at which every alert runs.
//Set the current state based on evaluation results
//return the state and a bool indicating whether a state transition occurred
func (st *StateTracker) setNextState(uid string, result eval.Result) (AlertState, bool) {
currentState := st.getOrCreate(uid, result)
st.Log.Debug("setting alert state", "uid", uid)
switch {
case currentState.State == result.State:
return currentState.State
st.Log.Debug("no state transition", "cacheId", currentState.CacheId, "state", currentState.State.String())
currentState.EvaluatedAt = strfmt.DateTime(result.EvaluatedAt)
currentState.Results = append(currentState.Results, result.State)
if currentState.State == eval.Alerting {
currentState.EndsAt = strfmt.DateTime(result.EvaluatedAt.Add(40 * time.Second))
}
st.set(currentState)
return currentState, false
case currentState.State == eval.Normal && result.State == eval.Alerting:
return eval.Alerting
st.Log.Debug("state transition from normal to alerting", "cacheId", currentState.CacheId)
currentState.State = eval.Alerting
currentState.EvaluatedAt = strfmt.DateTime(result.EvaluatedAt)
currentState.StartsAt = strfmt.DateTime(result.EvaluatedAt)
currentState.EndsAt = strfmt.DateTime(result.EvaluatedAt.Add(40 * time.Second))
currentState.Results = append(currentState.Results, result.State)
st.set(currentState)
return currentState, true
case currentState.State == eval.Alerting && result.State == eval.Normal:
return eval.Normal
st.Log.Debug("state transition from alerting to normal", "cacheId", currentState.CacheId)
currentState.State = eval.Normal
currentState.EvaluatedAt = strfmt.DateTime(result.EvaluatedAt)
currentState.EndsAt = strfmt.DateTime(result.EvaluatedAt)
currentState.Results = append(currentState.Results, result.State)
st.set(currentState)
return currentState, true
default:
return eval.Alerting
return currentState, false
}
}

func (st *StateTracker) cleanUp() {
ticker := time.NewTicker(time.Duration(60) * time.Minute)
st.Log.Debug("starting cleanup process", "intervalMinutes", 60)
for {
select {
case <-ticker.C:
st.trim()
case <-st.quit:
st.Log.Debug("stopping cleanup process", "now", time.Now())
ticker.Stop()
return
}
}
}

func (st *StateTracker) trim() {
st.Log.Info("trimming alert state cache")
st.stateCache.mu.Lock()
defer st.stateCache.mu.Unlock()
for _, v := range st.stateCache.cacheMap {
if len(v.Results) > 100 {
st.Log.Debug("trimming result set", "cacheId", v.CacheId, "count", len(v.Results)-100)
newResults := make([]eval.State, 100)
copy(newResults, v.Results[100:])
v.Results = newResults
st.set(v)
}
}
}
Loading

0 comments on commit b1cb74c

Please sign in to comment.