Skip to content

Implement periodic writing of alertmanager state to storage. #4031

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,14 @@ alertmanager_client:
# Skip validating server certificate.
# CLI flag: -alertmanager.alertmanager-client.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]

# The interval between persisting the current alertmanager state (notification
# log and silences) to object storage. This is only used when sharding is
# enabled. This state is read when all replicas for a shard can not be
# contacted. In this scenario, having persisted the state more frequently will
# result in potentially fewer lost silences, and fewer duplicate notifications.
# CLI flag: -alertmanager.persist-interval
[persist_interval: <duration> | default = 15m]
```

### `alertmanager_storage_config`
Expand Down
22 changes: 21 additions & 1 deletion pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Config struct {
ReplicationFactor int
Replicator Replicator
Store alertstore.AlertStore
PersisterConfig PersisterConfig
}

// An Alertmanager manages the alerts for one user.
Expand All @@ -81,6 +82,7 @@ type Alertmanager struct {
api *api.API
logger log.Logger
state State
persister *statePersister
nflog *nflog.Log
silences *silence.Silences
marker types.Marker
Expand Down Expand Up @@ -163,7 +165,9 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
am.state = cfg.Peer
} else if cfg.ShardingEnabled {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication")
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, cfg.Store, am.logger, am.registry)
state := newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, cfg.Store, am.logger, am.registry)
am.state = state
am.persister = newStatePersister(cfg.PersisterConfig, cfg.UserID, state, cfg.Store, am.logger)
} else {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication")
am.state = &NilPeer{}
Expand Down Expand Up @@ -208,6 +212,12 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
}
}

if am.persister != nil {
if err := am.persister.StartAsync(context.Background()); err != nil {
return nil, errors.Wrap(err, "failed to start state persister service")
}
}

am.pipelineBuilder = notify.NewPipelineBuilder(am.registry)

am.wg.Add(1)
Expand Down Expand Up @@ -351,6 +361,10 @@ func (am *Alertmanager) Stop() {
am.dispatcher.Stop()
}

if am.persister != nil {
am.persister.StopAsync()
}

if service, ok := am.state.(services.Service); ok {
service.StopAsync()
}
Expand All @@ -362,6 +376,12 @@ func (am *Alertmanager) Stop() {
func (am *Alertmanager) StopAndWait() {
am.Stop()

if am.persister != nil {
if err := am.persister.AwaitTerminated(context.Background()); err != nil {
level.Warn(am.logger).Log("msg", "error while stopping state persister service", "err", err)
}
}

if service, ok := am.state.(services.Service); ok {
if err := service.AwaitTerminated(context.Background()); err != nil {
level.Warn(am.logger).Log("msg", "error while stopping ring-based replication service", "err", err)
Expand Down
11 changes: 11 additions & 0 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ type MultitenantAlertmanagerConfig struct {

// For distributor.
AlertmanagerClient ClientConfig `yaml:"alertmanager_client"`

// For the state persister.
Persister PersisterConfig `yaml:",inline"`
}

type ClusterConfig struct {
Expand Down Expand Up @@ -154,6 +157,8 @@ func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) {

cfg.AlertmanagerClient.RegisterFlagsWithPrefix("alertmanager.alertmanager-client", f)

cfg.Persister.RegisterFlagsWithPrefix("alertmanager", f)

cfg.ShardingRing.RegisterFlags(f)
cfg.Store.RegisterFlags(f)
cfg.Cluster.RegisterFlags(f)
Expand All @@ -174,6 +179,11 @@ func (cfg *MultitenantAlertmanagerConfig) Validate() error {
if err := cfg.Store.Validate(); err != nil {
return errors.Wrap(err, "invalid storage config")
}

if err := cfg.Persister.Validate(); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -856,6 +866,7 @@ func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amco
Replicator: am,
ReplicationFactor: am.cfg.ShardingRing.ReplicationFactor,
Store: am.store,
PersisterConfig: am.cfg.Persister,
}, reg)
if err != nil {
return nil, fmt.Errorf("unable to start Alertmanager for user %v: %v", userID, err)
Expand Down
23 changes: 23 additions & 0 deletions pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,29 @@ func mockAlertmanagerConfig(t *testing.T) *MultitenantAlertmanagerConfig {
return cfg
}

func TestMultitenantAlertmanagerConfig_Validate(t *testing.T) {
// Default values only.
{
cfg := &MultitenantAlertmanagerConfig{}
flagext.DefaultValues(cfg)
assert.NoError(t, cfg.Validate())
}
// Invalid persist interval (zero).
{
cfg := &MultitenantAlertmanagerConfig{}
flagext.DefaultValues(cfg)
cfg.Persister.Interval = 0
assert.Equal(t, errInvalidPersistInterval, cfg.Validate())
}
// Invalid persist interval (negative).
{
cfg := &MultitenantAlertmanagerConfig{}
flagext.DefaultValues(cfg)
cfg.Persister.Interval = -1
assert.Equal(t, errInvalidPersistInterval, cfg.Validate())
}
}

func TestMultitenantAlertmanager_loadAndSyncConfigs(t *testing.T) {
ctx := context.Background()

Expand Down
109 changes: 109 additions & 0 deletions pkg/alertmanager/state_persister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package alertmanager

import (
"context"
"flag"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/alertmanager/cluster/clusterpb"

"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
"github.com/cortexproject/cortex/pkg/util/services"
)

const (
defaultPersistTimeout = 30 * time.Second
)

var (
errInvalidPersistInterval = errors.New("invalid alertmanager persist interval, must be greater than zero")
)

type PersisterConfig struct {
Interval time.Duration `yaml:"persist_interval"`
}

func (cfg *PersisterConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.Interval, prefix+".persist-interval", 15*time.Minute, "The interval between persisting the current alertmanager state (notification log and silences) to object storage. This is only used when sharding is enabled. This state is read when all replicas for a shard can not be contacted. In this scenario, having persisted the state more frequently will result in potentially fewer lost silences, and fewer duplicate notifications.")
}

func (cfg *PersisterConfig) Validate() error {
if cfg.Interval <= 0 {
return errInvalidPersistInterval
}
return nil
}

type PersistableState interface {
State
GetFullState() (*clusterpb.FullState, error)
}

// statePersister periodically writes the alertmanager state to persistent storage.
type statePersister struct {
services.Service

state PersistableState
store alertstore.AlertStore
userID string
logger log.Logger

timeout time.Duration
}

// newStatePersister creates a new state persister.
func newStatePersister(cfg PersisterConfig, userID string, state PersistableState, store alertstore.AlertStore, l log.Logger) *statePersister {

s := &statePersister{
state: state,
store: store,
userID: userID,
logger: l,
timeout: defaultPersistTimeout,
}

s.Service = services.NewTimerService(cfg.Interval, s.starting, s.iteration, nil)

return s
}

func (s *statePersister) starting(ctx context.Context) error {
// Waits until the state replicator is settled, so that state is not
// persisted before obtaining some initial state.
return s.state.WaitReady(ctx)
}

func (s *statePersister) iteration(ctx context.Context) error {
if err := s.persist(ctx); err != nil {
level.Error(s.logger).Log("msg", "failed to persist state", "user", s.userID, "err", err)
}
return nil
}

func (s *statePersister) persist(ctx context.Context) error {
// Only the replica at position zero should write the state.
if s.state.Position() != 0 {
return nil
}

level.Debug(s.logger).Log("msg", "persisting state", "user", s.userID)

fs, err := s.state.GetFullState()
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

desc := alertspb.FullStateDesc{State: fs}
if err := s.store.SetFullState(ctx, s.userID, desc); err != nil {
return err
}

return nil
}
Loading