Skip to content

Enabling availability zone awareness in metric R/W with ingesters. #2317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 29, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* `-flusher.wal-dir` for the WAL directory to recover from.
* `-flusher.concurrent-flushes` for number of concurrent flushes.
* `-flusher.flush-op-timeout` is duration after which a flush should timeout.
* [FEATURE] Ingesters can now have an optional availability zone set, to ensure metric replication is distributed across zones. This is set via the `-ingester.availability-zone` flag or the `availability_zone` field in the config file. #2317
* [ENHANCEMENT] Better re-use of connections to DynamoDB and S3. #2268
* [ENHANCEMENT] Experimental TSDB: Add support for local `filesystem` backend. #2245
* [ENHANCEMENT] Experimental TSDB: Added memcached support for the TSDB index cache. #2290
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ lifecycler:
# CLI flag: -ingester.tokens-file-path
[tokens_file_path: <string> | default = ""]

# The availability zone of the host, this instance is running on. Default is
# the lifecycler ID.
# CLI flag: -ingester.availability-zone
[availability_zone: <string> | default = ""]

# Number of times to try and transfer chunks before falling back to flushing.
# Negative value or zero disables hand-over.
# CLI flag: -ingester.max-transfer-retries
Expand Down
4 changes: 3 additions & 1 deletion docs/guides/running.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ Memcached is not essential but highly recommended.
The standard replication factor is three, so that we can drop one
replica and be unconcerned, as we still have two copies of the data
left for redundancy. This is configurable: you can run with more
redundancy or less, depending on your risk appetite.
redundancy or less, depending on your risk appetite. By default
ingesters are not aware of availability zones. See [zone aware replication](zone-replication.md)
to change this.

### Schema

Expand Down
30 changes: 30 additions & 0 deletions docs/guides/zone-replication.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
title: "Ingester Hand-over"
linkTitle: "Ingester Hand-over"
weight: 5
slug: ingester-handover
Comment on lines +2 to +5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@khaines This comes from a bad copy-paste. May you submit a PR to fix it, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it did... looked at it a few times and this didn't catch my eye.

---

In a default configuration, time-series written to ingesters are replicated based on the container/pod name of the ingester instances. It is completely possible that all the replicas for the given time-series are held with in the same availability zone, even if the cortex infrastructure spans multiple zones within the region. Storing multiple replicas for a given time-series poses a risk for data loss if there is an outage affecting various nodes within a zone or a total outage.

## Configuration

Cortex can be configured to consider an availability zone value in its replication system. Doing so mitigates risks associated with losing multiple nodes with in the same availability zone. The availability zone for an ingester can be defined on the command line of the ingester using the `ingester.availability-zone` flag or using the yaml configuration:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@khaines with in > within


```yaml
ingester:
lifecycler:
availability_zone: "zone-3"
```

## Zone Replication Considerations

Enabling availability zone awareness helps mitigate risks regarding data loss within a single zone, some items need consideration by an operator if they are thinking of enabling this feature.

### Minimum number of Zones

For cortex to function correctly, there must be at least the same number of availability zones as there is replica count. So by default, a cortex cluster should be spread over 3 zones as the default replica count is 3. It is safe to have more zones than the replica count, but it cannot be less. Having fewer availability zones than replica count causes a replica write to be missed, and in some cases, the write fails if the availability zone count is too low.

### Cost

Depending on the existing cortex infrastructure being used, this may cause an increase in running costs as most cloud providers charge for cross availability zone traffic. The most significant change would be for a cortex cluster currently running in a singular zone.
11 changes: 7 additions & 4 deletions pkg/ring/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const pageContent = `
<thead>
<tr>
<th>Instance ID</th>
<th>Availability Zone</th>
<th>State</th>
<th>Address</th>
<th>Last Heartbeat</th>
Expand All @@ -46,6 +47,7 @@ const pageContent = `
<tr bgcolor="#BEBEBE">
{{ end }}
<td>{{ .ID }}</td>
<td>{{ .Zone }}</td>
<td>{{ .State }}</td>
<td>{{ .Address }}</td>
<td>{{ .Timestamp }}</td>
Expand Down Expand Up @@ -138,16 +140,17 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

ingesters = append(ingesters, struct {
ID, State, Address, Timestamp string
Tokens []uint32
NumTokens int
Ownership float64
ID, State, Address, Timestamp, Zone string
Tokens []uint32
NumTokens int
Ownership float64
}{
ID: id,
State: state,
Address: ing.Addr,
Timestamp: timestamp.String(),
Tokens: ing.Tokens,
Zone: ing.Zone,
NumTokens: len(ing.Tokens),
Ownership: (float64(owned[id]) / float64(math.MaxUint32)) * 100,
})
Expand Down
19 changes: 14 additions & 5 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type LifecyclerConfig struct {
InfNames []string `yaml:"interface_names"`
FinalSleep time.Duration `yaml:"final_sleep"`
TokensFilePath string `yaml:"tokens_file_path"`
Zone string `yaml:"availability_zone"`

// For testing, you can override the address and ID of this ingester
Addr string `yaml:"address" doc:"hidden"`
Expand Down Expand Up @@ -103,6 +104,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
f.StringVar(&cfg.Addr, prefix+"lifecycler.addr", "", "IP address to advertise in consul.")
f.IntVar(&cfg.Port, prefix+"lifecycler.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).")
f.StringVar(&cfg.ID, prefix+"lifecycler.ID", hostname, "ID to register into consul.")
f.StringVar(&cfg.Zone, prefix+"availability-zone", "", "The availability zone of the host, this instance is running on. Default is the lifecycler ID.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@khaines Default is the lifecycler ID. This looks more an internal implementation detail than something we should let the final user know. For the final user the default is an empty string, which means the zone-awareness is disabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made an edit to this with #2384

}

// Lifecycler is responsible for managing the lifecycle of entries in the ring.
Expand All @@ -120,6 +122,7 @@ type Lifecycler struct {
Addr string
RingName string
RingKey string
Zone string

// Whether to flush if transfer fails on shutdown.
flushOnShutdown bool
Expand Down Expand Up @@ -160,6 +163,11 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
return nil, err
}

zone := cfg.Zone
if zone == "" {
zone = cfg.ID
}

// We do allow a nil FlushTransferer, but to keep the ring logic easier we assume
// it's always set, so we use a noop FlushTransferer
if flushTransferer == nil {
Expand All @@ -176,6 +184,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
RingName: ringName,
RingKey: ringKey,
flushOnShutdown: flushOnShutdown,
Zone: zone,

actorChan: make(chan func()),

Expand Down Expand Up @@ -502,14 +511,14 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
if len(tokensFromFile) >= i.cfg.NumTokens {
i.setState(ACTIVE)
}
ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState())
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState())
i.setTokens(tokensFromFile)
return ringDesc, true, nil
}

// Either we are a new ingester, or consul must have restarted
level.Info(util.Logger).Log("msg", "instance not found in ring, adding with no tokens", "ring", i.RingName)
ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState())
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState())
return ringDesc, true, nil
}

Expand Down Expand Up @@ -564,7 +573,7 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
ringTokens = append(ringTokens, newTokens...)
sort.Sort(ringTokens)

ringDesc.AddIngester(i.ID, i.Addr, ringTokens, i.GetState())
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState())

i.setTokens(ringTokens)

Expand Down Expand Up @@ -626,7 +635,7 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState IngesterState) er
sort.Sort(myTokens)
i.setTokens(myTokens)

ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState())
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState())

return ringDesc, true, nil
})
Expand Down Expand Up @@ -655,7 +664,7 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
if !ok {
// consul must have restarted
level.Info(util.Logger).Log("msg", "found empty ring, inserting tokens", "ring", i.RingName)
ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState())
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState())
} else {
ingesterDesc.Timestamp = time.Now().Unix()
ingesterDesc.State = i.GetState()
Expand Down
6 changes: 4 additions & 2 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewDesc() *Desc {

// AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens,
// any other tokens are removed.
func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState) {
func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state IngesterState) {
if d.Ingesters == nil {
d.Ingesters = map[string]IngesterDesc{}
}
Expand All @@ -47,6 +47,7 @@ func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState
Timestamp: time.Now().Unix(),
State: state,
Tokens: tokens,
Zone: zone,
}

d.Ingesters[id] = ingester
Expand Down Expand Up @@ -377,6 +378,7 @@ func (d *Desc) RemoveTombstones(limit time.Time) {
type TokenDesc struct {
Token uint32
Ingester string
Zone string
}

// Returns sorted list of tokens with ingester names.
Expand All @@ -388,7 +390,7 @@ func (d *Desc) getTokens() []TokenDesc {
tokens := make([]TokenDesc, 0, numTokens)
for key, ing := range d.Ingesters {
for _, token := range ing.Tokens {
tokens = append(tokens, TokenDesc{Token: token, Ingester: key})
tokens = append(tokens, TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()})
}
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
n = r.cfg.ReplicationFactor
ingesters = buf[:0]
distinctHosts = map[string]struct{}{}
distinctZones = map[string]struct{}{}
start = r.search(key)
iterations = 0
)
Expand All @@ -190,12 +191,16 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
// Wrap i around in the ring.
i %= len(r.ringTokens)

// We want n *distinct* ingesters.
// We want n *distinct* ingesters && distinct zones.
token := r.ringTokens[i]
if _, ok := distinctHosts[token.Ingester]; ok {
continue
}
if _, ok := distinctZones[token.Zone]; ok {
continue
}
distinctHosts[token.Ingester] = struct{}{}
distinctZones[token.Zone] = struct{}{}
ingester := r.ringDesc.Ingesters[token.Ingester]

// We do not want to Write to Ingesters that are not ACTIVE, but we do want
Expand Down
Loading