Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
376 changes: 66 additions & 310 deletions dispatch/dispatch.go

Large diffs are not rendered by default.

414 changes: 414 additions & 0 deletions dispatch/dispatch_bench_test.go

Large diffs are not rendered by default.

341 changes: 11 additions & 330 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 Prometheus Team
// Copyright Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -17,12 +17,12 @@ import (
"context"
"fmt"
"log/slog"
"reflect"
"sort"
"sync"
"testing"
"time"

"go.uber.org/atomic"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
Expand All @@ -35,329 +35,6 @@ import (
"github.com/prometheus/alertmanager/types"
)

func TestAggrGroup(t *testing.T) {
lset := model.LabelSet{
"a": "v1",
"b": "v2",
}
opts := &RouteOpts{
Receiver: "n1",
GroupBy: map[model.LabelName]struct{}{
"a": {},
"b": {},
},
GroupWait: 1 * time.Second,
GroupInterval: 300 * time.Millisecond,
RepeatInterval: 1 * time.Hour,
}
route := &Route{
RouteOpts: *opts,
}

var (
a1 = &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"a": "v1",
"b": "v2",
"c": "v3",
},
StartsAt: time.Now().Add(time.Minute),
EndsAt: time.Now().Add(time.Hour),
},
UpdatedAt: time.Now(),
}
a2 = &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"a": "v1",
"b": "v2",
"c": "v4",
},
StartsAt: time.Now().Add(-time.Hour),
EndsAt: time.Now().Add(2 * time.Hour),
},
UpdatedAt: time.Now(),
}
a3 = &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"a": "v1",
"b": "v2",
"c": "v5",
},
StartsAt: time.Now().Add(time.Minute),
EndsAt: time.Now().Add(5 * time.Minute),
},
UpdatedAt: time.Now(),
}
)

var (
last = time.Now()
current = time.Now()
lastCurMtx = &sync.Mutex{}
alertsCh = make(chan types.AlertSlice)
)

ntfy := func(ctx context.Context, alerts ...*types.Alert) bool {
// Validate that the context is properly populated.
if _, ok := notify.Now(ctx); !ok {
t.Errorf("now missing")
}
if _, ok := notify.GroupKey(ctx); !ok {
t.Errorf("group key missing")
}
if lbls, ok := notify.GroupLabels(ctx); !ok || !reflect.DeepEqual(lbls, lset) {
t.Errorf("wrong group labels: %q", lbls)
}
if rcv, ok := notify.ReceiverName(ctx); !ok || rcv != opts.Receiver {
t.Errorf("wrong receiver: %q", rcv)
}
if ri, ok := notify.RepeatInterval(ctx); !ok || ri != opts.RepeatInterval {
t.Errorf("wrong repeat interval: %q", ri)
}

lastCurMtx.Lock()
last = current
// Subtract a millisecond to allow for races.
current = time.Now().Add(-time.Millisecond)
lastCurMtx.Unlock()

alertsCh <- types.AlertSlice(alerts)

return true
}

removeEndsAt := func(as types.AlertSlice) types.AlertSlice {
for i, a := range as {
ac := *a
ac.EndsAt = time.Time{}
as[i] = &ac
}
return as
}

// Test regular situation where we wait for group_wait to send out alerts.
ag := newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger())
go ag.run(ntfy)

ag.insert(a1)

select {
case <-time.After(2 * opts.GroupWait):
t.Fatalf("expected initial batch after group_wait")

case batch := <-alertsCh:
lastCurMtx.Lock()
s := time.Since(last)
lastCurMtx.Unlock()
if s < opts.GroupWait {
t.Fatalf("received batch too early after %v", s)
}
exp := removeEndsAt(types.AlertSlice{a1})
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
}

for i := 0; i < 3; i++ {
// New alert should come in after group interval.
ag.insert(a3)

select {
case <-time.After(2 * opts.GroupInterval):
t.Fatalf("expected new batch after group interval but received none")

case batch := <-alertsCh:
lastCurMtx.Lock()
s := time.Since(last)
lastCurMtx.Unlock()
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
exp := removeEndsAt(types.AlertSlice{a1, a3})
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
}
}

ag.stop()

// Add an alert that started more than group_interval in the past. We expect
// immediate flushing.
// Finally, set all alerts to be resolved. After successful notify the aggregation group
// should empty itself.
ag = newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger())
go ag.run(ntfy)

ag.insert(a1)
ag.insert(a2)

// a2 lies way in the past so the initial group_wait should be skipped.
select {
case <-time.After(opts.GroupWait / 2):
t.Fatalf("expected immediate alert but received none")

case batch := <-alertsCh:
exp := removeEndsAt(types.AlertSlice{a1, a2})
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
}

for i := 0; i < 3; i++ {
// New alert should come in after group interval.
ag.insert(a3)

select {
case <-time.After(2 * opts.GroupInterval):
t.Fatalf("expected new batch after group interval but received none")

case batch := <-alertsCh:
lastCurMtx.Lock()
s := time.Since(last)
lastCurMtx.Unlock()
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
exp := removeEndsAt(types.AlertSlice{a1, a2, a3})
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
}
}

// Resolve an alert, and it should be removed after the next batch was sent.
a1r := *a1
a1r.EndsAt = time.Now()
ag.insert(&a1r)
exp := append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...)

select {
case <-time.After(2 * opts.GroupInterval):
t.Fatalf("expected new batch after group interval but received none")
case batch := <-alertsCh:
lastCurMtx.Lock()
s := time.Since(last)
lastCurMtx.Unlock()
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
}

// Resolve all remaining alerts, they should be removed after the next batch was sent.
// Do not add a1r as it should have been deleted following the previous batch.
a2r, a3r := *a2, *a3
resolved := types.AlertSlice{&a2r, &a3r}
for _, a := range resolved {
a.EndsAt = time.Now()
ag.insert(a)
}

select {
case <-time.After(2 * opts.GroupInterval):
t.Fatalf("expected new batch after group interval but received none")

case batch := <-alertsCh:
lastCurMtx.Lock()
s := time.Since(last)
lastCurMtx.Unlock()
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
sort.Sort(batch)

if !reflect.DeepEqual(batch, resolved) {
t.Fatalf("expected alerts %v but got %v", resolved, batch)
}

if !ag.empty() {
t.Fatalf("Expected aggregation group to be empty after resolving alerts: %v", ag)
}
}

ag.stop()
}

func TestGroupLabels(t *testing.T) {
a := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"a": "v1",
"b": "v2",
"c": "v3",
},
},
}

route := &Route{
RouteOpts: RouteOpts{
GroupBy: map[model.LabelName]struct{}{
"a": {},
"b": {},
},
GroupByAll: false,
},
}

expLs := model.LabelSet{
"a": "v1",
"b": "v2",
}

ls := getGroupLabels(a, route)

if !reflect.DeepEqual(ls, expLs) {
t.Fatalf("expected labels are %v, but got %v", expLs, ls)
}
}

func TestGroupByAllLabels(t *testing.T) {
a := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"a": "v1",
"b": "v2",
"c": "v3",
},
},
}

route := &Route{
RouteOpts: RouteOpts{
GroupBy: map[model.LabelName]struct{}{},
GroupByAll: true,
},
}

expLs := model.LabelSet{
"a": "v1",
"b": "v2",
"c": "v3",
}

ls := getGroupLabels(a, route)

if !reflect.DeepEqual(ls, expLs) {
t.Fatalf("expected labels are %v, but got %v", expLs, ls)
}
}

func TestGroups(t *testing.T) {
confData := `receivers:
- name: 'kafka'
Expand Down Expand Up @@ -750,14 +427,18 @@ func TestDispatcher_DoMaintenance(t *testing.T) {

ctx := context.Background()
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, promslog.NewNopLogger(), NewDispatcherMetrics(false, r))
aggrGroups := make(map[*Route]map[model.Fingerprint]*aggrGroup)
aggrGroups[route] = make(map[model.Fingerprint]*aggrGroup)

// Initialize the dispatcher's aggrGroupsPerRoute directly (avoid copying the struct)
dispatcher.aggrGroupsPerRoute = routeGroups{
groupsNum: &atomic.Int64{},
limits: nilLimits{},
}
groupsMap := dispatcher.aggrGroupsPerRoute.AddRoute(route)

// Insert an aggregation group with no alerts.
labels := model.LabelSet{"alertname": "1"}
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, promslog.NewNopLogger())
aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1
dispatcher.aggrGroupsPerRoute = aggrGroups
groupsMap.AddGroup(aggrGroup1.fingerprint(), aggrGroup1)
// Must run otherwise doMaintenance blocks on aggrGroup1.stop().
go aggrGroup1.run(func(context.Context, ...*types.Alert) bool { return true })

Expand Down
Loading
Loading