Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func run() int {
notificationLogOpts := []nflog.Option{
nflog.WithRetention(*retention),
nflog.WithSnapshot(filepath.Join(*dataDir, "nflog")),
nflog.WithMaintenance(15*time.Minute, stopc, wg.Done),
nflog.WithMaintenance(15*time.Minute, stopc, wg.Done, nil),
nflog.WithMetrics(prometheus.DefaultRegisterer),
nflog.WithLogger(log.With(logger, "component", "nflog")),
}
Expand Down Expand Up @@ -304,7 +304,7 @@ func run() int {
// Start providers before router potentially sends updates.
wg.Add(1)
go func() {
silences.Maintenance(15*time.Minute, filepath.Join(*dataDir, "silences"), stopc)
silences.Maintenance(15*time.Minute, filepath.Join(*dataDir, "silences"), stopc, nil)
wg.Done()
}()

Expand Down
61 changes: 37 additions & 24 deletions nflog/nflog.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,16 @@ type Log struct {

// For now we only store the most recently added log entry.
// The key is a serialized concatenation of group key and receiver.
mtx sync.RWMutex
st state
broadcast func([]byte)
mtx sync.RWMutex
st state
broadcast func([]byte)
maintenanceOverride MaintenanceFunc
}

// MaintenanceFunc represents the function to run as part of the periodic maintenance for the nflog.
// It returns the size of the snapshot taken or an error if it failed.
type MaintenanceFunc func() (int64, error)

type metrics struct {
gcDuration prometheus.Summary
snapshotDuration prometheus.Summary
Expand Down Expand Up @@ -190,14 +195,16 @@ func WithMetrics(r prometheus.Registerer) Option {
//
// The maintenance terminates on receiving from the provided channel.
// The done function is called after the final snapshot was completed.
func WithMaintenance(d time.Duration, stopc chan struct{}, done func()) Option {
// If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage.
func WithMaintenance(d time.Duration, stopc chan struct{}, done func(), maintenanceOverride MaintenanceFunc) Option {
return func(l *Log) error {
if d == 0 {
return errors.New("maintenance interval must not be 0")
}
l.runInterval = d
l.stopc = stopc
l.done = done
l.maintenanceOverride = maintenanceOverride
return nil
}
}
Expand Down Expand Up @@ -325,34 +332,40 @@ func (l *Log) run() {
t := time.NewTicker(l.runInterval)
defer t.Stop()

if l.done != nil {
defer l.done()
}

f := func() error {
start := l.now()
var doMaintenance MaintenanceFunc
doMaintenance = func() (int64, error) {
var size int64

level.Debug(l.logger).Log("msg", "Running maintenance")
defer func() {
level.Debug(l.logger).Log("msg", "Maintenance done", "duration", l.now().Sub(start), "size", size)
l.metrics.snapshotSize.Set(float64(size))
}()

if _, err := l.GC(); err != nil {
return err
return size, err
}
if l.snapf == "" {
return nil
return size, nil
}
f, err := openReplace(l.snapf)
if err != nil {
return err
return size, err
}
if size, err = l.Snapshot(f); err != nil {
return err
return size, err
}
return f.Close()
return size, f.Close()
}

if l.maintenanceOverride != nil {
doMaintenance = l.maintenanceOverride
}

if l.done != nil {
defer l.done()
}

runMaintenance := func(do func() (int64, error)) error {
start := l.now()
level.Debug(l.logger).Log("msg", "Running maintenance")
size, err := do()
level.Debug(l.logger).Log("msg", "Maintenance done", "duration", l.now().Sub(start), "size", size)
l.metrics.snapshotSize.Set(float64(size))
return err
}

Loop:
Expand All @@ -361,7 +374,7 @@ Loop:
case <-l.stopc:
break Loop
case <-t.C:
if err := f(); err != nil {
if err := runMaintenance(doMaintenance); err != nil {
level.Error(l.logger).Log("msg", "Running maintenance failed", "err", err)
}
}
Expand All @@ -370,7 +383,7 @@ Loop:
if l.snapf == "" {
return
}
if err := f(); err != nil {
if err := runMaintenance(doMaintenance); err != nil {
level.Error(l.logger).Log("msg", "Creating shutdown snapshot failed", "err", err)
}
}
Expand Down
30 changes: 30 additions & 0 deletions nflog/nflog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"

pb "github.com/prometheus/alertmanager/nflog/nflogpb"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -123,6 +126,33 @@ func TestLogSnapshot(t *testing.T) {
}
}

func TestWithMaintenance_SupportsCustomCallback(t *testing.T) {
f, err := ioutil.TempFile("", "snapshot")
require.NoError(t, err, "creating temp file failed")

stopc := make(chan struct{})
var mtx sync.Mutex
var mc int
l, err := New(WithMetrics(prometheus.NewPedanticRegistry()), WithSnapshot(f.Name()), WithMaintenance(100*time.Millisecond, stopc, nil, func() (int64, error) {
mtx.Lock()
mc++
mtx.Unlock()

return 0, nil
}))
require.NoError(t, err)

go l.run()
time.Sleep(200 * time.Millisecond)
close(stopc)

require.Eventually(t, func() bool {
mtx.Lock()
defer mtx.Unlock()
return mc >= 2
}, 500*time.Millisecond, 100*time.Millisecond)
}

func TestReplaceFile(t *testing.T) {
dir, err := ioutil.TempDir("", "replace_file")
require.NoError(t, err, "creating temp dir failed")
Expand Down
44 changes: 28 additions & 16 deletions silence/silence.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ type Silences struct {
mc matcherCache
}

// MaintenanceFunc represents the function to run as part of the periodic maintenance for silences.
// It returns the size of the snapshot taken or an error if it failed.
type MaintenanceFunc func() (int64, error)

type metrics struct {
gcDuration prometheus.Summary
snapshotDuration prometheus.Summary
Expand Down Expand Up @@ -349,34 +353,42 @@ func New(o Options) (*Silences, error) {
// Maintenance garbage collects the silence state at the given interval. If the snapshot
// file is set, a snapshot is written to it afterwards.
// Terminates on receiving from stopc.
func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}) {
// If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage.
func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}, override MaintenanceFunc) {
t := time.NewTicker(interval)
defer t.Stop()

f := func() error {
start := s.now()
var doMaintenance MaintenanceFunc
doMaintenance = func() (int64, error) {
var size int64

level.Debug(s.logger).Log("msg", "Running maintenance")
defer func() {
level.Debug(s.logger).Log("msg", "Maintenance done", "duration", s.now().Sub(start), "size", size)
s.metrics.snapshotSize.Set(float64(size))
}()

if _, err := s.GC(); err != nil {
return err
return size, err
}
if snapf == "" {
return nil
return size, nil
}
f, err := openReplace(snapf)
if err != nil {
return err
return size, err
}
if size, err = s.Snapshot(f); err != nil {
return err
return size, err
}
return f.Close()
return size, f.Close()
}

if doMaintenance != nil {
doMaintenance = override
}

runMaintenance := func(do MaintenanceFunc) error {
start := s.now()
level.Debug(s.logger).Log("msg", "Running maintenance")
size, err := do()
level.Debug(s.logger).Log("msg", "Maintenance done", "duration", s.now().Sub(start), "size", size)
s.metrics.snapshotSize.Set(float64(size))
return err
}

Loop:
Expand All @@ -385,7 +397,7 @@ Loop:
case <-stopc:
break Loop
case <-t.C:
if err := f(); err != nil {
if err := runMaintenance(doMaintenance); err != nil {
level.Info(s.logger).Log("msg", "Running maintenance failed", "err", err)
}
}
Expand All @@ -394,7 +406,7 @@ Loop:
if snapf == "" {
return
}
if err := f(); err != nil {
if err := runMaintenance(doMaintenance); err != nil {
level.Info(s.logger).Log("msg", "Creating shutdown snapshot failed", "err", err)
}
}
Expand Down
27 changes: 27 additions & 0 deletions silence/silence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io/ioutil"
"os"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -179,6 +180,32 @@ func TestSilencesSnapshot(t *testing.T) {
}
}

func TestSilences_Maintenance_SupportsCustomCallback(t *testing.T) {
f, err := ioutil.TempFile("", "snapshot")
require.NoError(t, err, "creating temp file failed")
s := &Silences{st: state{}, logger: log.NewNopLogger(), now: utcNow, metrics: newMetrics(nil, nil)}
stopc := make(chan struct{})
var mtx sync.Mutex
var mc int

go s.Maintenance(100*time.Millisecond, f.Name(), stopc, func() (int64, error) {
mtx.Lock()
mc++
mtx.Unlock()

return 0, nil
})

time.Sleep(200 * time.Millisecond)
close(stopc)

require.Eventually(t, func() bool {
mtx.Lock()
defer mtx.Unlock()
return mc >= 2 // At least, one for the regular schedule and one at shutdown.
}, 500*time.Millisecond, 100*time.Millisecond)
}

func TestSilencesSetSilence(t *testing.T) {
s, err := New(Options{
Retention: time.Minute,
Expand Down