Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alertmanager: Allow sharding of alertmanager tenants #3664

Merged
merged 4 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
* `-cluster.peer` in favor of `-alertmanager.cluster.peers`
* `-cluster.peer-timeout` in favor of `-alertmanager.cluster.peer-timeout`
* [FEATURE] Querier: Queries can be federated across multiple tenants. The tenants IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` request header. This is an experimental feature, which can be enabled by setting `-tenant-federation.enabled=true` on all Cortex services. #3250
* [FEATURE] Alertmanager: introduced the experimental option `-alertmanager.sharding-enabled` to shard tenants across multiple Alertmanager instances. This feature is still under heavy development and its usage is discouraged. The following new metrics are exported by the Alertmanager: #3664
* `cortex_alertmanager_ring_check_errors_total`
* `cortex_alertmanager_sync_configs_total`
* `cortex_alertmanager_sync_configs_failed_total`
* `cortex_alertmanager_tenants_discovered`
* `cortex_alertmanager_tenants_owned`
* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers, store-gateways and rulers. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 #3625
* [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier, store-gateway and ruler. When enabled, the querier, store-gateway and ruler will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics are exported by the querier and ruler: #3614 #3625
* `cortex_bucket_index_loads_total`
Expand Down
9 changes: 9 additions & 0 deletions docs/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ For the sake of clarity, in this document we have grouped API endpoints by servi
| [Delete rule group](#delete-rule-group) | Ruler | `DELETE /api/v1/rules/{namespace}/{groupName}` |
| [Delete namespace](#delete-namespace) | Ruler | `DELETE /api/v1/rules/{namespace}` |
| [Alertmanager status](#alertmanager-status) | Alertmanager | `GET /multitenant_alertmanager/status` |
| [Alertmanager ring status](#alertmanager-ring-status) | Alertmanager | `GET /multitenant_alertmanager/ring` |
| [Alertmanager UI](#alertmanager-ui) | Alertmanager | `GET /<alertmanager-http-prefix>` |
| [Get Alertmanager configuration](#get-alertmanager-configuration) | Alertmanager | `GET /api/v1/alerts` |
| [Set Alertmanager configuration](#set-alertmanager-configuration) | Alertmanager | `POST /api/v1/alerts` |
Expand Down Expand Up @@ -640,6 +641,14 @@ GET /status

Displays a web page with the current status of the Alertmanager, including the Alertmanager cluster members.

### Alertmanager ring status

```
GET /multitenant_alertmanager/ring
```

Displays a web page with the Alertmanager hash ring status, including the state, healthy and last heartbeat time of each Alertmanager instance.

### Alertmanager UI

```
Expand Down
60 changes: 60 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1501,6 +1501,64 @@ The `alertmanager_config` configures the Cortex alertmanager.
# CLI flag: -cluster.peer-timeout
[peer_timeout: <duration> | default = 15s]

# Shard tenants across multiple alertmanager instances.
# CLI flag: -alertmanager.sharding-enabled
[sharding_enabled: <boolean> | default = false]

sharding_ring:
# The key-value store used to share the hash ring across multiple instances.
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
# inmemory, memberlist, multi.
# CLI flag: -alertmanager.sharding-ring.store
[store: <string> | default = "consul"]

# The prefix for the keys in the store. Should end with a /.
# CLI flag: -alertmanager.sharding-ring.prefix
[prefix: <string> | default = "alertmanagers/"]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: alertmanager.sharding-ring
[consul: <consul_config>]

# The etcd_config configures the etcd client.
# The CLI flags prefix for this block config is: alertmanager.sharding-ring
[etcd: <etcd_config>]

multi:
# Primary backend storage used by multi-client.
# CLI flag: -alertmanager.sharding-ring.multi.primary
[primary: <string> | default = ""]

# Secondary backend storage used by multi-client.
# CLI flag: -alertmanager.sharding-ring.multi.secondary
[secondary: <string> | default = ""]

# Mirror writes to secondary store.
# CLI flag: -alertmanager.sharding-ring.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]

# Timeout for storing value to secondary store.
# CLI flag: -alertmanager.sharding-ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]

# Period at which to heartbeat to the ring.
# CLI flag: -alertmanager.sharding-ring.heartbeat-period
[heartbeat_period: <duration> | default = 15s]

# The heartbeat timeout after which alertmanagers are considered unhealthy
# within the ring.
# CLI flag: -alertmanager.sharding-ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# The replication factor to use when sharding the alertmanager.
# CLI flag: -alertmanager.sharding-ring.replication-factor
[replication_factor: <int> | default = 3]

# Name of network interface to read address from.
# CLI flag: -alertmanager.sharding-ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]

# Filename of fallback config to use if none specified for instance.
# CLI flag: -alertmanager.configs.fallback
[fallback_config_file: <string> | default = ""]
Expand Down Expand Up @@ -2886,6 +2944,7 @@ grpc_client_config:
The `etcd_config` configures the etcd client. The supported CLI flags `<prefix>` used to reference this config block are:

- _no prefix_
- `alertmanager.sharding-ring`
- `compactor.ring`
- `distributor.ha-tracker`
- `distributor.ring`
Expand Down Expand Up @@ -2933,6 +2992,7 @@ The `etcd_config` configures the etcd client. The supported CLI flags `<prefix>`
The `consul_config` configures the consul client. The supported CLI flags `<prefix>` used to reference this config block are:

- _no prefix_
- `alertmanager.sharding-ring`
- `compactor.ring`
- `distributor.ha-tracker`
- `distributor.ring`
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,4 @@ Currently experimental features are:
- The bucket index support in the querier and store-gateway (enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true`) is experimental
- The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions
- Querier: tenant federation
- Alertmanager: Sharding of tenants across multiple instances
79 changes: 79 additions & 0 deletions integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,82 @@ func TestAlertmanagerClustering(t *testing.T) {
require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(2)), "alertmanager_cluster_members"))
}
}

func TestAlertmanagerSharding(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := mergeFlags(AlertmanagerFlags(), AlertmanagerS3Flags())

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-alertmanager.storage.s3.buckets"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

client, err := s3.NewS3ObjectClient(s3.S3Config{
Endpoint: minio.HTTPEndpoint(),
S3ForcePathStyle: true,
Insecure: true,
BucketNames: flags["-alertmanager.storage.s3.buckets"],
AccessKeyID: e2edb.MinioAccessKey,
SecretAccessKey: e2edb.MinioSecretKey,
})
require.NoError(t, err)

// Create and upload Alertmanager configurations.
for i := 1; i <= 30; i++ {
user := fmt.Sprintf("user-%d", i)
desc := alerts.AlertConfigDesc{
RawConfig: simpleAlertmanagerConfig,
User: user,
Templates: []*alerts.TemplateDesc{},
}

d, err := desc.Marshal()
require.NoError(t, err)
err = client.PutObject(context.Background(), fmt.Sprintf("/alerts/%s", user), bytes.NewReader(d))
require.NoError(t, err)
}

// 3 instances, 30 configurations and a replication factor of 2.
flags = mergeFlags(flags, AlertmanagerShardingFlags(consul.NetworkHTTPEndpoint(), 2))

// Wait for the Alertmanagers to start.
alertmanager1 := e2ecortex.NewAlertmanager("alertmanager-1", flags, "")
alertmanager2 := e2ecortex.NewAlertmanager("alertmanager-2", flags, "")
alertmanager3 := e2ecortex.NewAlertmanager("alertmanager-3", flags, "")

alertmanagers := e2ecortex.NewCompositeCortexService(alertmanager1, alertmanager2, alertmanager3)

// Start Alertmanager instances.
for _, am := range alertmanagers.Instances() {
require.NoError(t, s.StartAndWaitReady(am))
}

for _, am := range alertmanagers.Instances() {
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "alertmanager"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"),
)))

// We expect every instance to discover every configuration but only own a subset of them.
require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(30)), "cortex_alertmanager_tenants_discovered"))
// We know that the ring has settled when every instance has some tenants and the total number of tokens have been assigned.
require.NoError(t, am.WaitSumMetrics(e2e.Greater(float64(0)), "cortex_alertmanager_tenants_owned"))
require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(384)), "cortex_ring_tokens_total"))
}

var totalTenants int
for _, am := range alertmanagers.Instances() {
values, err := am.SumMetrics([]string{"cortex_alertmanager_tenants_owned"})
require.NoError(t, err)

tenants := int(e2e.SumValues(values))
totalTenants += tenants
}

// The total number of tenants across all instances is: total alertmanager configs * replication factor.
// In this case: 30 * 2
require.Equal(t, 60, totalTenants)
}
10 changes: 10 additions & 0 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"text/template"

Expand Down Expand Up @@ -102,6 +103,15 @@ var (
}
}

AlertmanagerShardingFlags = func(consulAddress string, replicationFactor int) map[string]string {
return map[string]string{
"-alertmanager.sharding-enabled": "true",
"-alertmanager.sharding-ring.store": "consul",
"-alertmanager.sharding-ring.consul.hostname": consulAddress,
"-alertmanager.sharding-ring.replication-factor": strconv.Itoa(replicationFactor),
}
}

AlertmanagerLocalFlags = func() map[string]string {
return map[string]string{
"-alertmanager.storage.type": "local",
Expand Down
53 changes: 53 additions & 0 deletions pkg/alertmanager/alertmanager_http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package alertmanager

import (
"net/http"
"text/template"

"github.com/go-kit/kit/log/level"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)

var (
statusPageTemplate = template.Must(template.New("main").Parse(`
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Cortex Alertmanager Ring</title>
</head>
<body>
<h1>Cortex Alertmanager Ring</h1>
<p>{{ .Message }}</p>
</body>
</html>`))
)

func writeMessage(w http.ResponseWriter, message string) {
w.WriteHeader(http.StatusOK)
err := statusPageTemplate.Execute(w, struct {
Message string
}{Message: message})

if err != nil {
level.Error(util.Logger).Log("msg", "unable to serve alertmanager ring page", "err", err)
}
}

func (am *MultitenantAlertmanager) RingHandler(w http.ResponseWriter, req *http.Request) {
if !am.cfg.ShardingEnabled {
writeMessage(w, "Alertmanager has no ring because sharding is disabled.")
return
}

if am.State() != services.Running {
// we cannot read the ring before the alertmanager is in Running state,
// because that would lead to race condition.
writeMessage(w, "Alertmanager is not running yet.")
return
}

am.ring.ServeHTTP(w, req)
}
114 changes: 114 additions & 0 deletions pkg/alertmanager/alertmanager_ring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package alertmanager

import (
"flag"
"fmt"
"os"
"time"

"github.com/go-kit/kit/log/level"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
)

const (
// RingKey is the key under which we store the alertmanager ring in the KVStore.
RingKey = "alertmanager"

// RingNameForServer is the name of the ring used by the alertmanager server.
RingNameForServer = "alertmanager"

// RingNumTokens is a safe default instead of exposing to config option to the user
// in order to simplify the config.
RingNumTokens = 128
)

// RingOp is the operation used for distributing tenants between alertmanagers.
var RingOp = ring.NewOp([]ring.IngesterState{ring.ACTIVE}, func(s ring.IngesterState) bool {
// Only ACTIVE Alertmanager get requests. If instance is not ACTIVE, we need to find another Alertmanager.
return s != ring.ACTIVE
})

// RingConfig masks the ring lifecycler config which contains
// many options not really required by the alertmanager ring. This config
// is used to strip down the config to the minimum, and avoid confusion
// to the user.
type RingConfig struct {
KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
ReplicationFactor int `yaml:"replication_factor"`

// Instance details
InstanceID string `yaml:"instance_id" doc:"hidden"`
InstanceInterfaceNames []string `yaml:"instance_interface_names"`
InstancePort int `yaml:"instance_port" doc:"hidden"`
InstanceAddr string `yaml:"instance_addr" doc:"hidden"`

// Injected internally
ListenPort int `yaml:"-"`
RingCheckPeriod time.Duration `yaml:"-"`

// Used for testing
SkipUnregister bool `yaml:"-"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
hostname, err := os.Hostname()
if err != nil {
level.Error(util.Logger).Log("msg", "failed to get hostname", "err", err)
os.Exit(1)
}

// Prefix used by all the ring flags
rfprefix := "alertmanager.sharding-ring."

// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix(rfprefix, "alertmanagers/", f)
f.DurationVar(&cfg.HeartbeatPeriod, rfprefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, rfprefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which alertmanagers are considered unhealthy within the ring.")
f.IntVar(&cfg.ReplicationFactor, rfprefix+"replication-factor", 3, "The replication factor to use when sharding the alertmanager.")

// Instance flags
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), rfprefix+"instance-interface-names", "Name of network interface to read address from.")
f.StringVar(&cfg.InstanceAddr, rfprefix+"instance-addr", "", "IP address to advertise in the ring.")
f.IntVar(&cfg.InstancePort, rfprefix+"instance-port", 0, "Port to advertise in the ring (defaults to server.http-listen-port).")
f.StringVar(&cfg.InstanceID, rfprefix+"instance-id", hostname, "Instance ID to register in the ring.")

cfg.RingCheckPeriod = 5 * time.Second
}

// ToLifecyclerConfig returns a LifecyclerConfig based on the alertmanager
// ring config.
func (cfg *RingConfig) ToLifecyclerConfig() (ring.BasicLifecyclerConfig, error) {
instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames)
if err != nil {
return ring.BasicLifecyclerConfig{}, err
}

instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort)

return ring.BasicLifecyclerConfig{
ID: cfg.InstanceID,
Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort),
HeartbeatPeriod: cfg.HeartbeatPeriod,
TokensObservePeriod: 0,
NumTokens: RingNumTokens,
}, nil
}

func (cfg *RingConfig) ToRingConfig() ring.Config {
rc := ring.Config{}
flagext.DefaultValues(&rc)

rc.KVStore = cfg.KVStore
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
rc.ReplicationFactor = cfg.ReplicationFactor

return rc
}
Loading