Skip to content

Add jitter to HA deduping heartbeats (#1543) #1748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706
* [ENHANCEMENT] Consul client now follows recommended practices for blocking queries wrt returned Index value. #1708
* [ENHANCEMENT] Consul client can optionally rate-limit itself during Watch (used e.g. by ring watchers) and WatchPrefix (used by HA feature) operations. Rate limiting is disabled by default. New flags added: `--consul.watch-rate-limit`, and `--consul.watch-burst-size`. #1708
* [ENHANCEMENT] Added jitter to HA deduping heartbeats, configure using `distributor.ha-tracker.update-timeout-jitter-max` #1534

## 0.3.0 / 2019-10-11

Expand Down
3 changes: 3 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func (c *Config) Validate() error {
if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels); err != nil {
return errors.Wrap(err, "invalid limits config")
}
if err := c.Distributor.Validate(); err != nil {
return errors.Wrap(err, "invalid distributor config")
}
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
}

// Validate config and returns error on failure
func (cfg *Config) Validate() error {
return cfg.HATrackerConfig.Validate()
}

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ring ring.ReadRing) (*Distributor, error) {
if cfg.ingesterClientFactory == nil {
Expand Down
43 changes: 31 additions & 12 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"math/rand"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -62,9 +63,10 @@ func NewReplicaDesc() *ReplicaDesc {
// Track the replica we're accepting samples from
// for each HA cluster we know about.
type haTracker struct {
logger log.Logger
cfg HATrackerConfig
client kv.Client
logger log.Logger
cfg HATrackerConfig
client kv.Client
updateTimeoutJitter time.Duration

// Replicas we are accepting samples from.
electedLock sync.RWMutex
Expand All @@ -80,7 +82,8 @@ type HATrackerConfig struct {
// We should only update the timestamp if the difference
// between the stored timestamp and the time we received a sample at
// is more than this duration.
UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"`
UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"`
UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"`
// We should only failover to accepting samples from a replica
// other than the replica written in the KVStore if the difference
// between the stored timestamp and the time we received a sample is
Expand All @@ -100,6 +103,10 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
"distributor.ha-tracker.update-timeout",
15*time.Second,
"Update the timestamp in the KV store for a given cluster/replica only after this amount of time has passed since the current stored timestamp.")
f.DurationVar(&cfg.UpdateTimeoutJitterMax,
"distributor.ha-tracker.update-timeout-jitter-max",
5*time.Second,
"To spread the HA deduping heartbeats out over time.")
f.DurationVar(&cfg.FailoverTimeout,
"distributor.ha-tracker.failover-timeout",
30*time.Second,
Expand All @@ -108,21 +115,33 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
cfg.KVStore.RegisterFlagsWithPrefix("distributor.ha-tracker.", f)
}

// Validate config and returns error on failure
func (cfg *HATrackerConfig) Validate() error {
if cfg.FailoverTimeout < cfg.UpdateTimeout+cfg.UpdateTimeoutJitterMax+time.Second {
return fmt.Errorf("HA Tracker failover timeout (%v) must be at least 1s greater than update timeout (%v)",
cfg.FailoverTimeout, cfg.UpdateTimeout+cfg.UpdateTimeoutJitterMax+time.Second)
}
return nil
}

// NewClusterTracker returns a new HA cluster tracker using either Consul
// or in-memory KV store.
func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) {
codec := codec.Proto{Factory: ProtoReplicaDescFactory}

if cfg.FailoverTimeout <= cfg.UpdateTimeout {
return nil, fmt.Errorf("HA Tracker failover timeout must be greater than update timeout, %d is <= %d", cfg.FailoverTimeout, cfg.UpdateTimeout)
var jitter time.Duration
if cfg.UpdateTimeoutJitterMax > 0 {
jitter = time.Duration(rand.Int63n(int64(2*cfg.UpdateTimeoutJitterMax))) - cfg.UpdateTimeoutJitterMax
}

ctx, cancel := context.WithCancel(context.Background())
t := haTracker{
logger: util.Logger,
cfg: cfg,
done: make(chan struct{}),
elected: map[string]ReplicaDesc{},
cancel: cancel,
logger: util.Logger,
cfg: cfg,
updateTimeoutJitter: jitter,
done: make(chan struct{}),
elected: map[string]ReplicaDesc{},
cancel: cancel,
}

if cfg.EnableHATracker {
Expand Down Expand Up @@ -213,7 +232,7 @@ func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now t

// We don't need to CAS and update the timestamp in the KV store if the timestamp we've received
// this sample at is less than updateTimeout amount of time since the timestamp in the KV store.
if desc.Replica == replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.UpdateTimeout {
if desc.Replica == replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter {
return nil, false, nil
}

Expand Down
17 changes: 14 additions & 3 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestFailoverGreaterUpdate(t *testing.T) {
in: HATrackerConfig{
EnableHATracker: true,
UpdateTimeout: time.Second,
FailoverTimeout: 999 * time.Millisecond,
FailoverTimeout: 1999 * time.Millisecond,
KVStore: kv.Config{
Store: "inmemory",
},
Expand All @@ -84,7 +84,18 @@ func TestFailoverGreaterUpdate(t *testing.T) {
in: HATrackerConfig{
EnableHATracker: true,
UpdateTimeout: time.Second,
FailoverTimeout: 1001 * time.Millisecond,
FailoverTimeout: 2000 * time.Millisecond,
KVStore: kv.Config{
Store: "inmemory",
},
},
fail: false,
},
{
in: HATrackerConfig{
EnableHATracker: true,
UpdateTimeout: time.Second,
FailoverTimeout: 2001 * time.Millisecond,
KVStore: kv.Config{
Store: "inmemory",
},
Expand All @@ -94,7 +105,7 @@ func TestFailoverGreaterUpdate(t *testing.T) {
}

for _, c := range cases {
_, err := newClusterTracker(c.in)
err := c.in.Validate()
fail := err != nil
assert.Equal(t, c.fail, fail, "unexpected result: %s", err)
}
Expand Down