Skip to content

HA tracker KV store cleanup #3809

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
* `cortex_ingester_tsdb_symbol_table_size_bytes`
* `cortex_ingester_tsdb_storage_blocks_bytes`
* `cortex_ingester_tsdb_time_retentions_total`
* [ENHANCEMENT] Distributor / HA Tracker: added cleanup of unused elected HA replicas from KV store. Added following metrics to monitor this process: #3809
* `cortex_ha_tracker_replicas_cleanup_started_total`
* `cortex_ha_tracker_replicas_cleanup_marked_for_deletion_total`
* `cortex_ha_tracker_replicas_cleanup_deleted_total`
* `cortex_ha_tracker_replicas_cleanup_delete_failed_total`
* [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745
* [BUGFIX] Querier / ruler: do not log "error removing stale clients" if the ring is empty. #3761
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,4 @@ Currently experimental features are:
- Querier: tenant federation
- Alertmanager: Sharding of tenants across multiple instances
- The thanosconvert tool for converting Thanos block metadata to Cortex
- HA Tracker: cleanup of old replicas from KV Store.
15 changes: 10 additions & 5 deletions integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func TestKVWatchAndDelete(t *testing.T) {
// Consul reports:
// map[key-before-watch:[value-before-watch] key-to-delete:[value-to-delete]]
//
// Etcd reports:
// map[key-to-delete:[value-to-delete ""]]
// Etcd reports (before changing etcd client to ignore deletes):
// map[key-to-delete:[value-to-delete <nil>]]
t.Log(w.values)
})
}
Expand Down Expand Up @@ -208,9 +208,14 @@ func verifyClientMetrics(t *testing.T, reg *prometheus.Registry, sampleCounts ma

type stringCodec struct{}

func (c stringCodec) Decode(bb []byte) (interface{}, error) { return string(bb), nil }
func (c stringCodec) Encode(v interface{}) ([]byte, error) { return []byte(v.(string)), nil }
func (c stringCodec) CodecID() string { return "stringCodec" }
func (c stringCodec) Decode(bb []byte) (interface{}, error) {
if bb == nil {
return "<nil>", nil
}
return string(bb), nil
}
func (c stringCodec) Encode(v interface{}) ([]byte, error) { return []byte(v.(string)), nil }
func (c stringCodec) CodecID() string { return "stringCodec" }

type watcher struct {
values map[string][]interface{}
Expand Down
147 changes: 142 additions & 5 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ type haTracker struct {
electedReplicaTimestamp *prometheus.GaugeVec
electedReplicaPropagationTime prometheus.Histogram
kvCASCalls *prometheus.CounterVec

cleanupRuns prometheus.Counter
replicasMarkedForDeletion prometheus.Counter
deletedReplicas prometheus.Counter
markingOrDeletionsFailed prometheus.Counter
}

// NewClusterTracker returns a new HA cluster tracker using either Consul
Expand Down Expand Up @@ -149,6 +154,23 @@ func newHATracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Re
Name: "cortex_ha_tracker_kv_store_cas_total",
Help: "The total number of CAS calls to the KV store for a user ID/cluster.",
}, []string{"user", "cluster"}),

cleanupRuns: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ha_tracker_replicas_cleanup_started_total",
Help: "Number of elected replicas cleanup loops started.",
}),
replicasMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ha_tracker_replicas_cleanup_marked_for_deletion_total",
Help: "Number of elected replicas marked for deletion.",
}),
deletedReplicas: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ha_tracker_replicas_cleanup_deleted_total",
Help: "Number of elected replicas deleted from KV store.",
}),
markingOrDeletionsFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ha_tracker_replicas_cleanup_delete_failed_total",
Help: "Number of elected replicas that failed to be marked for deletion, or deleted.",
}),
}

if cfg.EnableHATracker {
Expand All @@ -175,12 +197,18 @@ func (c *haTracker) loop(ctx context.Context) error {
return nil
}

// Start cleanup loop. It will stop when context is done.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
c.cleanupOldReplicasLoop(ctx)
}()

// The KVStore config we gave when creating c should have contained a prefix,
// which would have given us a prefixed KVStore client. So, we can pass empty string here.
c.client.WatchPrefix(ctx, "", func(key string, value interface{}) bool {
replica := value.(*ReplicaDesc)
c.electedLock.Lock()
defer c.electedLock.Unlock()
segments := strings.SplitN(key, "/", 2)

// Valid key would look like cluster/replica, and a key without a / such as `ring` would be invalid.
Expand All @@ -191,6 +219,16 @@ func (c *haTracker) loop(ctx context.Context) error {
user := segments[0]
cluster := segments[1]

c.electedLock.Lock()
defer c.electedLock.Unlock()

if replica.DeletedAt > 0 {
delete(c.elected, key)
c.electedReplicaChanges.DeleteLabelValues(user, cluster)
c.electedReplicaTimestamp.DeleteLabelValues(user, cluster)
return true
}

elected, exists := c.elected[key]
if replica.Replica != elected.Replica {
c.electedReplicaChanges.WithLabelValues(user, cluster).Inc()
Expand All @@ -204,9 +242,107 @@ func (c *haTracker) loop(ctx context.Context) error {
return true
})

wg.Wait()
return nil
}

const (
cleanupCyclePeriod = 30 * time.Minute
cleanupCycleJitterVariance = 0.2 // for 30 minutes, this is ±6 min

// If we have received last sample for given cluster before this timeout, we will mark selected replica for deletion.
// If selected replica is marked for deletion for this time, it is deleted completely.
deletionTimeout = 30 * time.Minute
)

func (c *haTracker) cleanupOldReplicasLoop(ctx context.Context) {
tick := time.NewTicker(util.DurationWithJitter(cleanupCyclePeriod, cleanupCycleJitterVariance))
defer tick.Stop()

for {
select {
case <-ctx.Done():
return
case t := <-tick.C:
c.cleanupRuns.Inc()
c.cleanupOldReplicas(ctx, t.Add(-deletionTimeout))
}
}
}

// Replicas marked for deletion before deadline will be deleted.
// Replicas with last-received timestamp before deadline will be marked for deletion.
func (c *haTracker) cleanupOldReplicas(ctx context.Context, deadline time.Time) {
keys, err := c.client.List(ctx, "")
if err != nil {
level.Warn(c.logger).Log("msg", "cleanup: failed to list replica keys", "err", err)
return
}

for _, key := range keys {
if ctx.Err() != nil {
return
}

val, err := c.client.Get(ctx, key)
if err != nil {
level.Warn(c.logger).Log("msg", "cleanup: failed to get replica value", "key", key, "err", err)
continue
}

desc, ok := val.(*ReplicaDesc)
if !ok {
level.Error(c.logger).Log("msg", "cleanup: got invalid replica descriptor", "key", key)
continue
}

if desc.DeletedAt > 0 {
if timestamp.Time(desc.DeletedAt).After(deadline) {
continue
}

// We're blindly deleting a key here. It may happen that value was updated since we have read it few lines above,
// in which case Distributors will have updated value in memory, but Delete will remove it from KV store anyway.
// That's not great, but should not be a problem. If KV store sends Watch notification for Delete, distributors will
// delete it from memory, and recreate on next sample with matching replica.
//
// If KV store doesn't send Watch notification for Delete, distributors *with* replica in memory will keep using it,
// while distributors *without* replica in memory will try to write it to KV store -- which will update *all*
// watching distributors.
err = c.client.Delete(ctx, key)
if err != nil {
level.Error(c.logger).Log("msg", "cleanup: failed to delete old replica", "key", key, "err", err)
c.markingOrDeletionsFailed.Inc()
} else {
level.Info(c.logger).Log("msg", "cleanup: deleted old replica", "key", key)
c.deletedReplicas.Inc()
}
continue
}

// Not marked as deleted yet.
if desc.DeletedAt == 0 && timestamp.Time(desc.ReceivedAt).Before(deadline) {
err := c.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) {
d, ok := in.(*ReplicaDesc)
if !ok || d == nil || d.DeletedAt > 0 || !timestamp.Time(desc.ReceivedAt).Before(deadline) {
return nil, false, nil
}

d.DeletedAt = timestamp.FromTime(time.Now())
return d, true, nil
})

if err != nil {
c.markingOrDeletionsFailed.Inc()
level.Error(c.logger).Log("msg", "cleanup: failed to mark replica as deleted", "key", key, "err", err)
} else {
c.replicasMarkedForDeletion.Inc()
level.Info(c.logger).Log("msg", "cleanup: marked replica as deleted", "key", key)
}
}
}
}

// CheckReplica checks the cluster and replica against the backing KVStore and local cache in the
// tracker c to see if we should accept the incomming sample. It will return an error if the sample
// should not be accepted. Note that internally this function does checks against the stored values
Expand Down Expand Up @@ -254,8 +390,7 @@ func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica s

func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now time.Time) error {
return c.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) {
if desc, ok := in.(*ReplicaDesc); ok {

if desc, ok := in.(*ReplicaDesc); ok && desc.DeletedAt == 0 {
// We don't need to CAS and update the timestamp in the KV store if the timestamp we've received
// this sample at is less than updateTimeout amount of time since the timestamp in the KV store.
if desc.Replica == replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter {
Expand All @@ -273,7 +408,9 @@ func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now t
// from this replica. Invalid could mean that the timestamp in the KV store was
// out of date based on the update and failover timeouts when compared to now.
return &ReplicaDesc{
Replica: replica, ReceivedAt: timestamp.FromTime(now),
Replica: replica,
ReceivedAt: timestamp.FromTime(now),
DeletedAt: 0,
}, true, nil
})
}
Expand Down
68 changes: 57 additions & 11 deletions pkg/distributor/ha_tracker.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading