Skip to content

Add documentation for HA tracker flags. #1465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ The **distributor** service is responsible for handling samples written by Prome

Distributors communicate with ingesters via [gRPC](https://grpc.io). They are stateless and can be scaled up and down as needed.

If the HA Tracker is enabled, the Distributor will deduplicate incoming samples that contain both a cluster and replica label. It talks to a KVStore to store state about which replica per cluster it's accepting samples from for a given user ID. Samples with one or neither of these labels will be accepted by default.

#### Hashing

Distributors use consistent hashing, in conjunction with the (configurable) replication factor, to determine *which* instances of the ingester service receive each sample.
Expand Down Expand Up @@ -128,4 +130,4 @@ The interface works somewhat differently across the supported databases:

A set of schemas are used to map the matchers and label sets used on reads and writes to the chunk store into appropriate operations on the index. Schemas have been added as Cortex has evolved, mainly in an attempt to better load balance writes and improve query performance.

> The current schema recommendation is the **v10 schema**.
> The current schema recommendation is the **v10 schema**.
63 changes: 62 additions & 1 deletion docs/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,67 @@ The ingester query API was improved over time, but defaults to the old behaviour
- `-distributor.extra-query-delay`
This is used by a component with an embedded distributor (Querier and Ruler) to control how long to wait until sending more than the minimum amount of queries needed for a successful response.

- `distributor.ha-tracker.enable-for-all-users`
Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup. This defaults to false, and is technically defined in the Distributor limits.

- `distributor.ha-tracker.enable`
Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels). Global (for distributors), this ensures that the necessary internal data structures for the HA handling are created. The option `enable-for-all-users` is still needed to enable ingestion of HA samples for all users.

### Ring/HA Tracker Store

The KVStore client is used by both the Ring and HA Tracker.
- `{ring,distributor.ha-tracker}.prefix`
The prefix for the keys in the store. Should end with a /. For example with a prefix of foo/, the key bar would be stored under foo/bar.
- `{ring,distributor.ha-tracker}.store`
Backend storage to use for the ring (consul, etcd, inmemory).

#### Consul

By default these flags are used to configure Consul used for the ring. To configure Consul for the HA tracker,
prefix these flags with `distributor.ha-tracker.`

- `consul.hostname`
Hostname and port of Consul.
- `consul.acltoken`
ACL token used to interact with Consul.
- `consul.client-timeout`
HTTP timeout when talking to Consul.
- `consul.consistent-reads`
Enable consistent reads to Consul.

#### etcd

By default these flags are used to configure etcd used for the ring. To configure etcd for the HA tracker,
prefix these flags with `distributor.ha-tracker.`

- `etcd.endpoints`
The etcd endpoints to connect to.
- `etcd.dial-timeout`
The timeout for the etcd connection.
- `etcd.max-retries`
The maximum number of retries to do for failed ops.

### HA Tracker

HA tracking has two of it's own flags:
- `distributor.ha-tracker.cluster`
Prometheus label to look for in samples to identify a Prometheus HA cluster. (default "cluster")
- `distributor.ha-tracker.replica`
Prometheus label to look for in samples to identify a Prometheus HA replica. (default "__replica__")

It's reasonable to assume people probably already have a `cluster` label, or something similar. If not, they should add one along with `__replica__`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use external labels in our prometheus config, as each prometheus gets a different UserID / TenantID. Does Cortex still require the "cluster" external label then if I only have one prometheus cluster per tenant or does it use the default "cluster" if no label is specified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both labels have to be present for Cortex to dedup samples from HA prometheus instances. Whether the labels are already in your series or added via external labels doesn't matter. Cortex just needs a way of identifying each HA cluster, and each replica within that cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming I am just about to enable the HA tracker feature and I have a single prometheus instance for each tenant/user ID which does not set either of these two labels. Does this cause any problems, will it reject samples without these labels?

via external labels in their Prometheus config.

HA Tracking looks for the two labels (which can be overwritten per user)

It also talks to a KVStore and has it's own copies of the same flags used by the Distributor to connect to for the ring.
- `distributor.ha-tracker.failover-timeout`
If we don't receive any samples from the accepted replica for a cluster in this amount of time we will failover to the next replica we receive a sample from. This value must be greater than the update timeout (default 30s)
- `distributor.ha-tracker.store`
Backend storage to use for the ring (consul, etcd, inmemory). (default "consul")
- `distributor.ha-tracker.update-timeout`
Update the timestamp in the KV store for a given cluster/replica only after this amount of time has passed since the current stored timestamp. (default 15s)

## Ingester

- `-ingester.normalise-tokens`
Expand Down Expand Up @@ -185,4 +246,4 @@ Valid fields are (with their corresponding flags for default values):

- `s3.force-path-style`

Set this to `true` to force the request to use path-style addressing (`http://s3.amazonaws.com/BUCKET/KEY`). By default, the S3 client will use virtual hosted bucket addressing when possible (`http://BUCKET.s3.amazonaws.com/KEY`).
Set this to `true` to force the request to use path-style addressing (`http://s3.amazonaws.com/BUCKET/KEY`). By default, the S3 client will use virtual hosted bucket addressing when possible (`http://BUCKET.s3.amazonaws.com/KEY`).
12 changes: 6 additions & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ type Config struct {
BillingConfig billing.Config `yaml:"billing,omitempty"`
PoolConfig ingester_client.PoolConfig `yaml:"pool,omitempty"`

EnableHAReplicas bool `yaml:"enable_ha_pairs,omitempty"`
HATrackerConfig HATrackerConfig `yaml:"ha_tracker,omitempty"`
EnableHATracker bool `yaml:"enable_ha_tracker,omitempty"`
HATrackerConfig HATrackerConfig `yaml:"ha_tracker,omitempty"`

RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay,omitempty"`
Expand All @@ -123,7 +123,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.HATrackerConfig.RegisterFlags(f)

f.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.")
f.BoolVar(&cfg.EnableHAReplicas, "distributor.accept-ha-labels", false, "Accept samples from Prometheus HA replicas gracefully (requires labels).")
f.BoolVar(&cfg.EnableHATracker, "distributor.ha-tracker.enable", false, "Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels).")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.")
Expand Down Expand Up @@ -160,7 +160,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
quit: make(chan struct{}),
}

if cfg.EnableHAReplicas {
if cfg.EnableHATracker {
replicas, err := newClusterTracker(cfg.HATrackerConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -198,7 +198,7 @@ func (d *Distributor) loop() {
func (d *Distributor) Stop() {
close(d.quit)
d.ingesterPool.Stop()
if d.cfg.EnableHAReplicas {
if d.cfg.EnableHATracker {
d.replicas.stop()
}
}
Expand Down Expand Up @@ -284,7 +284,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
var lastPartialErr error
removeReplica := false

if d.cfg.EnableHAReplicas && d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
if d.cfg.EnableHATracker && d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
removeReplica, err = d.checkSample(ctx, userID, req.Timeseries[0])
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestDistributorPushHAInstances(t *testing.T) {
for _, shardByAllLabels := range []bool{true, false} {
t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v)", i, shardByAllLabels), func(t *testing.T) {
d := prepare(t, 1, 1, 0, shardByAllLabels)
d.cfg.EnableHAReplicas = true
d.cfg.EnableHATracker = true
d.limits.Defaults.AcceptHASamples = true
codec := codec.Proto{Factory: ProtoReplicaDescFactory}
mock := kv.PrefixClient(consul.NewInMemoryClient(codec), "prefix")
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ type HATrackerConfig struct {
// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.UpdateTimeout,
"ha-tracker.update-timeout",
"distributor.ha-tracker.update-timeout",
15*time.Second,
"Update the timestamp in the KV store for a given cluster/replica only after this amount of time has passed since the current stored timestamp.")
f.DurationVar(&cfg.FailoverTimeout,
"ha-tracker.failover-timeout",
"distributor.ha-tracker.failover-timeout",
30*time.Second,
"If we don't receive any samples from the accepted replica for a cluster in this amount of time we will failover to the next replica we receive a sample from. This value must be greater than the update timeout")
// We want the ability to use different Consul instances for the ring and for HA cluster tracking.
cfg.KVStore.RegisterFlagsWithPrefix("ha-tracker.", f)
cfg.KVStore.RegisterFlagsWithPrefix("distributor.ha-tracker.", f)
}

// NewClusterTracker returns a new HA cluster tracker using either Consul
Expand Down
4 changes: 2 additions & 2 deletions pkg/ring/kv/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type Client struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.Host, prefix+"consul.hostname", "localhost:8500", "Hostname and port of Consul.")
f.StringVar(&cfg.ACLToken, prefix+"consul.acltoken", "", "ACL Token used to interact with Consul.")
f.DurationVar(&cfg.HTTPClientTimeout, prefix+"consul.client-timeout", 2*longPollDuration, "HTTP timeout when talking to consul")
f.BoolVar(&cfg.ConsistentReads, prefix+"consul.consistent-reads", true, "Enable consistent reads to consul.")
f.DurationVar(&cfg.HTTPClientTimeout, prefix+"consul.client-timeout", 2*longPollDuration, "HTTP timeout when talking to Consul")
f.BoolVar(&cfg.ConsistentReads, prefix+"consul.consistent-reads", true, "Enable consistent reads to Consul.")
}

// NewClient returns a new Client.
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type Limits struct {
func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples). Warning, very high limits will be reset every -distributor.limiter-reload-period.")
f.BoolVar(&l.AcceptHASamples, "distributor.accept-ha-samples", false, "Per-user flag to enable handling of samples with external labels for identifying replicas in an HA Prometheus setup.")
f.StringVar(&l.HAReplicaLabel, "ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Proemtheus HA replica.")
f.StringVar(&l.HAClusterLabel, "ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Poemtheus HA cluster.")
f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.")
f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.")
f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.")
f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names")
f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name")
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.")
Expand Down