Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 159 additions & 117 deletions dispatch/dispatch.go

Large diffs are not rendered by default.

47 changes: 37 additions & 10 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ func TestDispatcherRace(t *testing.T) {
defer alerts.Close()

timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg))
route := &Route{}
dispatcher := NewDispatcher(alerts, route, nil, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg))
go dispatcher.Run(time.Now())
dispatcher.Stop()
}
Expand Down Expand Up @@ -743,20 +744,46 @@ func TestDispatcher_DoMaintenance(t *testing.T) {
GroupWait: 0,
GroupInterval: 5 * time.Minute, // Should never hit in this test.
},
Idx: 0,
}
timeout := func(d time.Duration) time.Duration { return d }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}

ctx := context.Background()
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, promslog.NewNopLogger(), NewDispatcherMetrics(false, r))
aggrGroups := make(map[*Route]map[model.Fingerprint]*aggrGroup)
aggrGroups[route] = make(map[model.Fingerprint]*aggrGroup)
// Manually create the routeAggrGroups structure since we are not calling Run().
dispatcher.routeGroupsSlice = make([]routeAggrGroups, route.Idx+1)
dispatcher.routeGroupsSlice[route.Idx] = routeAggrGroups{
route: route,
}

// Insert an aggregation group with no alerts.
// Insert an aggregation group with one resolved alert.
labels := model.LabelSet{"alertname": "1"}
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1
dispatcher.aggrGroupsPerRoute = aggrGroups
dispatcher.routeGroupsSlice[route.Idx].groups.Store(aggrGroup1.fingerprint(), aggrGroup1)

// Add a resolved alert
resolvedAlert := &types.Alert{
Alert: model.Alert{
Labels: labels,
StartsAt: time.Now().Add(-2 * time.Hour),
EndsAt: time.Now().Add(-1 * time.Hour), // Already resolved
},
UpdatedAt: time.Now().Add(-1 * time.Hour),
}
aggrGroup1.alerts.Set(resolvedAlert)

// Flush will detect the resolved alert and delete it via DeleteIfNotModified
// This is the actual production code path
notified := false
aggrGroup1.flush(func(alerts ...*types.Alert) bool {
require.Len(t, alerts, 1)
require.Equal(t, labels, alerts[0].Labels)
notified = true
return true // Simulate successful notification
})
require.True(t, notified, "flush should have called notify function")

// Must run otherwise doMaintenance blocks on aggrGroup1.stop().
go aggrGroup1.run(func(context.Context, ...*types.Alert) bool { return true })

Expand Down Expand Up @@ -823,8 +850,8 @@ func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) {
ag.insert(ctx, resolvedAlert)

// Set markers for both alerts
marker.SetActiveOrSilenced(activeAlert.Fingerprint(), 0, nil, nil)
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)
marker.SetActiveOrSilenced(activeAlert.Fingerprint(), nil)
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), nil)

// Verify markers exist before flush
require.True(t, marker.Active(activeAlert.Fingerprint()))
Expand Down Expand Up @@ -880,7 +907,7 @@ func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) {
ag.insert(ctx, resolvedAlert)

// Set marker for the alert
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), nil)

// Verify marker exists before flush
require.True(t, marker.Active(resolvedAlert.Fingerprint()))
Expand Down Expand Up @@ -934,7 +961,7 @@ func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) {
ag.insert(ctx, resolvedAlert)

// Set marker for the alert
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), nil)

// Verify marker exists before flush
require.True(t, marker.Active(resolvedAlert.Fingerprint()))
Expand Down
21 changes: 17 additions & 4 deletions dispatch/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,18 @@ type Route struct {

// Children routes of this route.
Routes []*Route

// Idx contains the index of this route in the config
Idx int
}

// NewRoute returns a new route.
func NewRoute(cr *config.Route, parent *Route) *Route {
counter := 0
return newRoute(cr, parent, &counter)
}

func newRoute(cr *config.Route, parent *Route, counter *int) *Route {
// Create default and overwrite with configured settings.
opts := DefaultRouteOpts
if parent != nil {
Expand Down Expand Up @@ -128,16 +136,21 @@ func NewRoute(cr *config.Route, parent *Route) *Route {
Continue: cr.Continue,
}

route.Routes = NewRoutes(cr.Routes, route)
// Create child routes first (they get lower indices)
route.Routes = newRoutes(cr.Routes, route, counter)

// Assign index to this route after all children have been indexed
route.Idx = *counter
*counter++

return route
}

// NewRoutes returns a slice of routes.
func NewRoutes(croutes []*config.Route, parent *Route) []*Route {
// newRoutes returns a slice of routes.
func newRoutes(croutes []*config.Route, parent *Route, counter *int) []*Route {
res := []*Route{}
for _, cr := range croutes {
res = append(res, NewRoute(cr, parent))
res = append(res, newRoute(cr, parent, counter))
}
return res
}
Expand Down
87 changes: 87 additions & 0 deletions dispatch/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,3 +920,90 @@ routes:
})
require.ElementsMatch(t, actual, expected)
}

func TestRouteIndices(t *testing.T) {
in := `
receiver: 'notify-def'

routes:
- match:
owner: 'team-A'

receiver: 'notify-A'

routes:
- match:
env: 'testing'

receiver: 'notify-testing'
group_by: [...]

- match:
env: "production"

receiver: 'notify-productionA'
group_wait: 1m

continue: true

- match_re:
env: "produ.*"
job: ".*"

receiver: 'notify-productionB'
group_wait: 30s
group_interval: 5m
repeat_interval: 1h
group_by: ['job']

- match_re:
owner: 'team-(B|C)'

group_by: ['foo', 'bar']
group_wait: 2m
receiver: 'notify-BC'

- match:
group_by: 'role'
group_by: ['role']

routes:
- match:
env: 'testing'
receiver: 'notify-testing'
routes:
- match:
wait: 'long'
group_wait: 2m
`

var ctree config.Route
if err := yaml.UnmarshalStrict([]byte(in), &ctree); err != nil {
t.Fatal(err)
}
tree := NewRoute(&ctree, nil)

// Collect all indices
var indices []int
var totalNodes int
tree.Walk(func(r *Route) {
indices = append(indices, r.Idx)
totalNodes++
})

// All indices are unique
seenIndices := make(map[int]bool)
for _, idx := range indices {
require.False(t, seenIndices[idx], "Index %d appears more than once", idx)
seenIndices[idx] = true
}

// Root index equals total nodes - 1
require.Equal(t, totalNodes-1, tree.Idx, "Root index should equal total nodes - 1")

// All indices are in range [0, totalNodes)
for _, idx := range indices {
require.GreaterOrEqual(t, idx, 0, "Index should be >= 0")
require.Less(t, idx, totalNodes, "Index should be < total nodes")
}
}
2 changes: 1 addition & 1 deletion notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func TestMuteStageWithSilences(t *testing.T) {

// Set the second alert as previously silenced with an old version
// number. This is expected to get unsilenced by the stage.
marker.SetActiveOrSilenced(inAlerts[1].Fingerprint(), 0, []string{"123"}, nil)
marker.SetActiveOrSilenced(inAlerts[1].Fingerprint(), []string{"123"})

_, alerts, err := stage.Exec(context.Background(), promslog.NewNopLogger(), inAlerts...)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion provider/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func NewAlerts(
ctx, cancel := context.WithCancel(ctx)
a := &Alerts{
marker: m,
alerts: store.NewAlerts().WithPerAlertLimit(perAlertNameLimit),
alerts: store.NewAlerts().WithDontDestroy().WithPerAlertLimit(perAlertNameLimit),
cancel: cancel,
listeners: map[int]listeningAlerts{},
next: 0,
Expand Down
4 changes: 2 additions & 2 deletions provider/mem/mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func TestAlertsGC(t *testing.T) {
}

for _, a := range insert {
marker.SetActiveOrSilenced(a.Fingerprint(), 0, nil, nil)
marker.SetActiveOrSilenced(a.Fingerprint(), nil)
marker.SetInhibited(a.Fingerprint())
if !marker.Active(a.Fingerprint()) {
t.Errorf("error setting status: %v", a)
Expand Down Expand Up @@ -477,7 +477,7 @@ func TestAlerts_CountByState(t *testing.T) {

// When insert an alert, and then silence it. It shows up with the correct filter.
alerts.Put(ctx, a2)
marker.SetActiveOrSilenced(a2.Fingerprint(), 1, []string{"1"}, nil)
marker.SetActiveOrSilenced(a2.Fingerprint(), []string{"1"})
_, suppressed, _ := alerts.countByState()
require.Equal(t, 1, suppressed)
require.Equal(t, 1, countTotal())
Expand Down
65 changes: 65 additions & 0 deletions silence/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package silence

import (
"sync"

"github.com/prometheus/common/model"
)

type cacheEntry struct {
activeIDs []string
pendingIDs []string
version int
}

func newCacheEntry(activeIDs, pendingIDs []string, version int) *cacheEntry {
return &cacheEntry{
activeIDs: activeIDs,
pendingIDs: pendingIDs,
version: version,
}
}

func (e *cacheEntry) count() int {
return len(e.activeIDs) + len(e.pendingIDs)
}

type cache struct {
entries map[model.Fingerprint]*cacheEntry
mu sync.RWMutex
}

func (c *cache) delete(fp model.Fingerprint) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.entries, fp)
}

func (c *cache) get(fp model.Fingerprint) cacheEntry {
c.mu.RLock()
defer c.mu.RUnlock()
entry := cacheEntry{}
if e, found := c.entries[fp]; found {
entry = *e
}
return entry
}

func (c *cache) set(fp model.Fingerprint, entry *cacheEntry) {
c.mu.Lock()
defer c.mu.Unlock()
c.entries[fp] = entry
}
Loading