Skip to content

Alertmanager: Deprecate (but keep compatibility) of cluster flags #3677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/scripts/install-docker.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

set -x
VER="17.03.0-ce"
VER="18.06.3-ce"
curl -L -o /tmp/docker-$VER.tgz https://download.docker.com/linux/static/stable/x86_64/docker-$VER.tgz
tar -xz -C /tmp -f /tmp/docker-$VER.tgz
mv /tmp/docker/* /usr/bin
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
* [CHANGE] Blocks storage: block deletion marks are now stored in a per-tenant global markers/ location too, other than within the block location. The compactor, at startup, will copy deletion marks from the block location to the global location. This migration is required only once, so you can safely disable it via `-compactor.block-deletion-marks-migration-enabled=false` once new compactor has successfully started once in your cluster. #3583
* [CHANGE] OpenStack Swift: the default value for the `-ruler.storage.swift.container-name` and `-swift.container-name` config options has changed from `cortex` to empty string. If you were relying on the default value, you should set it back to `cortex`. #3660
* [CHANGE] HA Tracker: configured replica label is now verified against label value length limit (`-validation.max-length-label-value`). #3668
* [CHANGE] Alertmanager: Deprecated `-cluster.` CLI flags in favor of their `-alertmanager.cluster.` equivalent. The deprecated flags (and their respective YAML config options) are: #3677
* `-cluster.listen-address` in favor of `-alertmanager.cluster.listen-address`
* `-cluster.advertise-address` in favor of `-alertmanager.cluster.advertise-address`
* `-cluster.peer` in favor of `-alertmanager.cluster.peers`
* `-cluster.peer-timeout` in favor of `-alertmanager.cluster.peer-timeout`
* [FEATURE] Querier: Queries can be federated across multiple tenants. The tenants IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` request header. This is an experimental feature, which can be enabled by setting `-tenant-federation.enabled=true` on all Cortex services. #3250
* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers, store-gateways and rulers. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 #3625
* [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier, store-gateway and ruler. When enabled, the querier, store-gateway and ruler will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics are exported by the querier and ruler: #3614 #3625
Expand Down
26 changes: 22 additions & 4 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1485,19 +1485,19 @@ The `alertmanager_config` configures the Cortex alertmanager.
# CLI flag: -alertmanager.configs.poll-interval
[poll_interval: <duration> | default = 15s]

# Listen address for cluster.
# Deprecated. Use -alertmanager.cluster.listen-address instead.
# CLI flag: -cluster.listen-address
[cluster_bind_address: <string> | default = "0.0.0.0:9094"]

# Explicit address to advertise in cluster.
# Deprecated. Use -alertmanager.cluster.advertise-address instead.
# CLI flag: -cluster.advertise-address
[cluster_advertise_address: <string> | default = ""]

# Initial peers (may be repeated).
# Deprecated. Use -alertmanager.cluster.peers instead.
# CLI flag: -cluster.peer
[peers: <list of string> | default = []]

# Time to wait between peers to send notifications.
# Deprecated. Use -alertmanager.cluster.peer-timeout instead.
# CLI flag: -cluster.peer-timeout
[peer_timeout: <duration> | default = 15s]

Expand Down Expand Up @@ -1647,6 +1647,24 @@ storage:
# CLI flag: -alertmanager.storage.local.path
[path: <string> | default = ""]

cluster:
# Listen address and port for the cluster. Not specifying this flag disables
# high-availability mode.
# CLI flag: -alertmanager.cluster.listen-address
[listen_address: <string> | default = "0.0.0.0:9094"]

# Explicit address or hostname to advertise in cluster.
# CLI flag: -alertmanager.cluster.advertise-address
[advertise_address: <string> | default = ""]

# Comma-separated list of initial peers.
# CLI flag: -alertmanager.cluster.peers
[peers: <string> | default = ""]

# Time to wait between peers to send notifications.
# CLI flag: -alertmanager.cluster.peer-timeout
[peer_timeout: <duration> | default = 15s]

# Enable the experimental alertmanager config api.
# CLI flag: -experimental.alertmanager.enable-api
[enable_api: <boolean> | default = false]
Expand Down
63 changes: 63 additions & 0 deletions integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
package integration

import (
"bytes"
"context"
"fmt"
"net/http"
"strings"
"testing"
"time"

Expand All @@ -16,8 +18,15 @@ import (
"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/alertmanager/alerts"
s3 "github.com/cortexproject/cortex/pkg/chunk/aws"
)

const simpleAlertmanagerConfig = `route:
receiver: dummy
receivers:
- name: dummy`

func TestAlertmanager(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -143,3 +152,57 @@ func TestAlertmanagerStoreAPI(t *testing.T) {
require.Nil(t, cfg)
require.EqualError(t, err, "not found")
}

func TestAlertmanagerClustering(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := mergeFlags(AlertmanagerFlags(), AlertmanagerS3Flags())

// Start dependencies.
minio := e2edb.NewMinio(9000, flags["-alertmanager.storage.s3.buckets"])
require.NoError(t, s.StartAndWaitReady(minio))

client, err := s3.NewS3ObjectClient(s3.S3Config{
Endpoint: minio.HTTPEndpoint(),
S3ForcePathStyle: true,
Insecure: true,
BucketNames: flags["-alertmanager.storage.s3.buckets"],
AccessKeyID: e2edb.MinioAccessKey,
SecretAccessKey: e2edb.MinioSecretKey,
})
require.NoError(t, err)

// Create and upload an Alertmanager configuration.
user := "user-1"
desc := alerts.AlertConfigDesc{RawConfig: simpleAlertmanagerConfig, User: user, Templates: []*alerts.TemplateDesc{}}

d, err := desc.Marshal()
require.NoError(t, err)
err = client.PutObject(context.Background(), fmt.Sprintf("/alerts/%s", user), bytes.NewReader(d))
require.NoError(t, err)

peers := strings.Join([]string{
e2e.NetworkContainerHostPort(networkName, "alertmanager-1", e2ecortex.GossipPort),
e2e.NetworkContainerHostPort(networkName, "alertmanager-2", e2ecortex.GossipPort),
}, ",")
flags = mergeFlags(flags, AlertmanagerClusterFlags(peers))

// Wait for the Alertmanagers to start.
alertmanager1 := e2ecortex.NewAlertmanager("alertmanager-1", flags, "")
alertmanager2 := e2ecortex.NewAlertmanager("alertmanager-2", flags, "")

alertmanagers := e2ecortex.NewCompositeCortexService(alertmanager1, alertmanager2)

// Start Alertmanager instances.
for _, am := range alertmanagers.Instances() {
require.NoError(t, s.StartAndWaitReady(am))
}

for _, am := range alertmanagers.Instances() {
require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(0)), "alertmanager_cluster_health_score")) // Lower means healthier, 0 being totally healthy.
require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(0)), "alertmanager_cluster_failed_peers"))
require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(2)), "alertmanager_cluster_members"))
}
}
8 changes: 8 additions & 0 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ var (
}
}

AlertmanagerClusterFlags = func(peers string) map[string]string {
return map[string]string{
"-alertmanager.cluster.listen-address": "0.0.0.0:9094", // This is the default, but let's be explicit.
"-alertmanager.cluster.peers": peers,
"-alertmanager.cluster.peer-timeout": "2s",
}
}

AlertmanagerLocalFlags = func() map[string]string {
return map[string]string{
"-alertmanager.storage.type": "local",
Expand Down
19 changes: 13 additions & 6 deletions integration/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (s *ConcreteService) NetworkEndpoint(port int) string {
//
// This method return correct endpoint for the service in any state.
func (s *ConcreteService) NetworkEndpointFor(networkName string, port int) string {
return fmt.Sprintf("%s:%d", containerName(networkName, s.name), port)
return fmt.Sprintf("%s:%d", NetworkContainerHost(networkName, s.name), port)
}

func (s *ConcreteService) SetReadinessProbe(probe ReadinessProbe) {
Expand All @@ -240,12 +240,8 @@ func (s *ConcreteService) Ready() error {
return s.readiness.Ready(s)
}

func containerName(netName string, name string) string {
return fmt.Sprintf("%s-%s", netName, name)
}

func (s *ConcreteService) containerName() string {
return containerName(s.usedNetworkName, s.name)
return NetworkContainerHost(s.usedNetworkName, s.name)
}

func (s *ConcreteService) WaitForRunning() (err error) {
Expand Down Expand Up @@ -355,6 +351,17 @@ func (s *ConcreteService) Exec(command *Command) (string, string, error) {
return stdout.String(), stderr.String(), err
}

// NetworkContainerHost return the hostname of the container within the network. This is
// the address a container should use to connect to other containers.
func NetworkContainerHost(networkName, containerName string) string {
return fmt.Sprintf("%s-%s", networkName, containerName)
}

// NetworkContainerHostPort return the host:port address of a container within the network.
func NetworkContainerHostPort(networkName, containerName string, port int) string {
return fmt.Sprintf("%s-%s:%d", networkName, containerName, port)
}

type Command struct {
cmd string
args []string
Expand Down
6 changes: 4 additions & 2 deletions integration/e2ecortex/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
)

const (
httpPort = 80
grpcPort = 9095
httpPort = 80
grpcPort = 9095
GossipPort = 9094
)

// GetDefaultImage returns the Docker image to use to run Cortex.
Expand Down Expand Up @@ -345,6 +346,7 @@ func NewAlertmanager(name string, flags map[string]string, image string) *Cortex
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
httpPort,
grpcPort,
GossipPort,
)
}

Expand Down
89 changes: 70 additions & 19 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,31 @@ type MultitenantAlertmanagerConfig struct {
ExternalURL flagext.URLValue `yaml:"external_url"`
PollInterval time.Duration `yaml:"poll_interval"`

ClusterBindAddr string `yaml:"cluster_bind_address"`
ClusterAdvertiseAddr string `yaml:"cluster_advertise_address"`
Peers flagext.StringSlice `yaml:"peers"`
PeerTimeout time.Duration `yaml:"peer_timeout"`
DeprecatedClusterBindAddr string `yaml:"cluster_bind_address"`
DeprecatedClusterAdvertiseAddr string `yaml:"cluster_advertise_address"`
DeprecatedPeers flagext.StringSlice `yaml:"peers"`
DeprecatedPeerTimeout time.Duration `yaml:"peer_timeout"`

FallbackConfigFile string `yaml:"fallback_config_file"`
AutoWebhookRoot string `yaml:"auto_webhook_root"`

Store AlertStoreConfig `yaml:"storage"`
Store AlertStoreConfig `yaml:"storage"`
Cluster ClusterConfig `yaml:"cluster"`

EnableAPI bool `yaml:"enable_api"`
}

const defaultClusterAddr = "0.0.0.0:9094"
type ClusterConfig struct {
ListenAddr string `yaml:"listen_address"`
AdvertiseAddr string `yaml:"advertise_address"`
Peers flagext.StringSliceCSV `yaml:"peers"`
PeerTimeout time.Duration `yaml:"peer_timeout"`
}

const (
defaultClusterAddr = "0.0.0.0:9094"
defaultPeerTimeout = 15 * time.Second
)

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -115,14 +126,52 @@ func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.AutoWebhookRoot, "alertmanager.configs.auto-webhook-root", "", "Root of URL to generate if config is "+autoWebhookURL)
f.DurationVar(&cfg.PollInterval, "alertmanager.configs.poll-interval", 15*time.Second, "How frequently to poll Cortex configs")

f.StringVar(&cfg.ClusterBindAddr, "cluster.listen-address", defaultClusterAddr, "Listen address for cluster.")
f.StringVar(&cfg.ClusterAdvertiseAddr, "cluster.advertise-address", "", "Explicit address to advertise in cluster.")
f.Var(&cfg.Peers, "cluster.peer", "Initial peers (may be repeated).")
f.DurationVar(&cfg.PeerTimeout, "cluster.peer-timeout", time.Second*15, "Time to wait between peers to send notifications.")
// Flags prefixed with `cluster` are deprecated in favor of their `alertmanager` prefix equivalent.
// TODO: New flags introduced in Cortex 1.7, remove old ones in Cortex 1.9
f.StringVar(&cfg.DeprecatedClusterBindAddr, "cluster.listen-address", defaultClusterAddr, "Deprecated. Use -alertmanager.cluster.listen-address instead.")
f.StringVar(&cfg.DeprecatedClusterAdvertiseAddr, "cluster.advertise-address", "", "Deprecated. Use -alertmanager.cluster.advertise-address instead.")
f.Var(&cfg.DeprecatedPeers, "cluster.peer", "Deprecated. Use -alertmanager.cluster.peers instead.")
f.DurationVar(&cfg.DeprecatedPeerTimeout, "cluster.peer-timeout", time.Second*15, "Deprecated. Use -alertmanager.cluster.peer-timeout instead.")

f.BoolVar(&cfg.EnableAPI, "experimental.alertmanager.enable-api", false, "Enable the experimental alertmanager config api.")

cfg.Store.RegisterFlags(f)
cfg.Cluster.RegisterFlags(f)
}

func (cfg *ClusterConfig) RegisterFlags(f *flag.FlagSet) {
prefix := "alertmanager.cluster."
f.StringVar(&cfg.ListenAddr, prefix+"listen-address", defaultClusterAddr, "Listen address and port for the cluster. Not specifying this flag disables high-availability mode.")
f.StringVar(&cfg.AdvertiseAddr, prefix+"advertise-address", "", "Explicit address or hostname to advertise in cluster.")
f.Var(&cfg.Peers, prefix+"peers", "Comma-separated list of initial peers.")
f.DurationVar(&cfg.PeerTimeout, prefix+"peer-timeout", defaultPeerTimeout, "Time to wait between peers to send notifications.")
}

// SupportDeprecatedFlagset ensures we support the previous set of cluster flags that are now deprecated.
func (cfg *ClusterConfig) SupportDeprecatedFlagset(amCfg *MultitenantAlertmanagerConfig, logger log.Logger) {
if amCfg.DeprecatedClusterBindAddr != defaultClusterAddr {
flagext.DeprecatedFlagsUsed.Inc()
level.Warn(logger).Log("msg", "running with DEPRECATED flag -cluster.listen-address, use -alertmanager.cluster.listen-address instead.")
cfg.ListenAddr = amCfg.DeprecatedClusterBindAddr
}

if amCfg.DeprecatedClusterAdvertiseAddr != "" {
flagext.DeprecatedFlagsUsed.Inc()
level.Warn(logger).Log("msg", "running with DEPRECATED flag -cluster.advertise-address, use -alertmanager.cluster.advertise-address instead.")
cfg.AdvertiseAddr = amCfg.DeprecatedClusterAdvertiseAddr
}

if len(amCfg.DeprecatedPeers) > 0 {
flagext.DeprecatedFlagsUsed.Inc()
level.Warn(logger).Log("msg", "running with DEPRECATED flag -cluster.peer, use -alertmanager.cluster.peers instead.")
cfg.Peers = []string(amCfg.DeprecatedPeers)
}

if amCfg.DeprecatedPeerTimeout != defaultPeerTimeout {
flagext.DeprecatedFlagsUsed.Inc()
level.Warn(logger).Log("msg", "running with DEPRECATED flag -cluster.peer-timeout, use -alertmanager.cluster.peer-timeout instead.")
cfg.PeerTimeout = amCfg.DeprecatedPeerTimeout
}
}

// Validate config and returns error on failure
Expand Down Expand Up @@ -206,14 +255,16 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, logger log.L
}
}

cfg.Cluster.SupportDeprecatedFlagset(cfg, logger)

var peer *cluster.Peer
if cfg.ClusterBindAddr != "" {
if cfg.Cluster.ListenAddr != "" {
peer, err = cluster.Create(
log.With(logger, "component", "cluster"),
registerer,
cfg.ClusterBindAddr,
cfg.ClusterAdvertiseAddr,
cfg.Peers,
cfg.Cluster.ListenAddr,
cfg.Cluster.AdvertiseAddr,
cfg.Cluster.Peers,
true,
cluster.DefaultPushPullInterval,
cluster.DefaultGossipInterval,
Expand All @@ -226,7 +277,7 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, logger log.L
}
err = peer.Join(cluster.DefaultReconnectInterval, cluster.DefaultReconnectTimeout)
if err != nil {
level.Warn(logger).Log("msg", "unable to join gossip mesh", "err", err)
level.Warn(logger).Log("msg", "unable to join gossip mesh while initializing cluster for high availability mode", "err", err)
}
go peer.Settle(context.Background(), cluster.DefaultGossipInterval)
}
Expand Down Expand Up @@ -260,13 +311,13 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC
return am
}

func (am *MultitenantAlertmanager) starting(ctx context.Context) error {
func (am *MultitenantAlertmanager) starting(_ context.Context) error {
// Load initial set of all configurations before polling for new ones.
am.syncConfigs(am.loadAllConfigs())
return nil
}

func (am *MultitenantAlertmanager) iteration(ctx context.Context) error {
func (am *MultitenantAlertmanager) iteration(_ context.Context) error {
err := am.updateConfigs()
if err != nil {
level.Warn(am.logger).Log("msg", "error updating configs", "err", err)
Expand All @@ -284,7 +335,7 @@ func (am *MultitenantAlertmanager) stopping(_ error) error {
}
am.alertmanagersMtx.Unlock()
if am.peer != nil { // Tests don't setup any peer.
err := am.peer.Leave(am.cfg.PeerTimeout)
err := am.peer.Leave(am.cfg.Cluster.PeerTimeout)
if err != nil {
level.Warn(am.logger).Log("msg", "failed to leave the cluster", "err", err)
}
Expand Down Expand Up @@ -454,7 +505,7 @@ func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amco
DataDir: am.cfg.DataDir,
Logger: util.Logger,
Peer: am.peer,
PeerTimeout: am.cfg.PeerTimeout,
PeerTimeout: am.cfg.Cluster.PeerTimeout,
Retention: am.cfg.Retention,
ExternalURL: am.cfg.ExternalURL.URL,
}, reg)
Expand Down