Skip to content

Implement gRPC based initial state settling in alertmanager. #3925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,6 @@ func TestAlertmanagerClustering(t *testing.T) {
}

func TestAlertmanagerSharding(t *testing.T) {
// TODO See: https://github.com/cortexproject/cortex/issues/3927
t.Skip("this test is skipped because of a bug in the alertmanager sharding logic, which is currently under development")

tests := map[string]struct {
legacyAlertStore bool
}{
Expand Down
24 changes: 17 additions & 7 deletions pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ type Replicator interface {
// The alertmanager replication protocol relies on a position related to other replicas.
// This position is then used to identify who should notify about the alert first.
GetPositionForUser(userID string) int
// ReadFullStateForUser obtains the full state from other replicas in the cluster.
ReadFullStateForUser(context.Context, string) ([]*clusterpb.FullState, error)
}

// New creates a new Alertmanager.
Expand Down Expand Up @@ -159,13 +161,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
am.state = cfg.Peer
} else if cfg.ShardingEnabled {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication")
state := newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry)

if err := state.Service.StartAsync(context.Background()); err != nil {
return nil, errors.Wrap(err, "failed to start ring-based replication service")
}

am.state = state
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry)
} else {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication")
am.state = &NilPeer{}
Expand Down Expand Up @@ -203,6 +199,13 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
c = am.state.AddState("sil:"+cfg.UserID, am.silences, am.registry)
am.silences.SetBroadcast(c.Broadcast)

// State replication needs to be started after the state keys are defined.
if service, ok := am.state.(services.Service); ok {
if err := service.StartAsync(context.Background()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't wait until started here (and it's correct). However, this means that we may start using this Alertmanager instance before settlement is completed (and I believe this is not correct). Am I missing anything?

Copy link
Contributor Author

@stevesg stevesg Mar 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been assuming we want to settle in the background, because an Alertmanager is spun up for every tenant. If they all have to hit the timeout, that might take too long if done serially. That being said, it's harder to reason about correctness in this case

Perhaps safer to change it to wait for now, and explore doing it in the background as a separate piece of work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth noting that "start using" is not as one would think. Yes, we'll accept alerts, silences, etc. However, we'll wait for the state to be replicated before we send a notification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will leave as-is. My (current) understanding is as Josh said - there is no requirement to block (except for notifications, which are blocked via the call into WaitReady).

return nil, errors.Wrap(err, "failed to start ring-based replication service")
}
}

am.pipelineBuilder = notify.NewPipelineBuilder(am.registry)

am.wg.Add(1)
Expand Down Expand Up @@ -373,6 +376,13 @@ func (am *Alertmanager) mergePartialExternalState(part *clusterpb.Part) error {
return errors.New("ring-based sharding not enabled")
}

func (am *Alertmanager) getFullState() (*clusterpb.FullState, error) {
if state, ok := am.state.(*state); ok {
return state.GetFullState()
}
return nil, errors.New("ring-based sharding not enabled")
}

// buildIntegrationsMap builds a map of name to the list of integration notifiers off of a
// list of receiver config.
func buildIntegrationsMap(nc []*config.Receiver, tmpl *template.Template, logger log.Logger) (map[string][]notify.Integration, error) {
Expand Down
Loading