Skip to content

HA tracker limits #3668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jan 18, 2021
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [CHANGE] Blocks storage: compactor is now required when running a Cortex cluster with the blocks storage, because it also keeps the bucket index updated. #3583
* [CHANGE] Blocks storage: block deletion marks are now stored in a per-tenant global markers/ location too, other than within the block location. The compactor, at startup, will copy deletion marks from the block location to the global location. This migration is required only once, so you can safely disable it via `-compactor.block-deletion-marks-migration-enabled=false` once new compactor has successfully started once in your cluster. #3583
* [CHANGE] OpenStack Swift: the default value for the `-ruler.storage.swift.container-name` and `-swift.container-name` config options has changed from `cortex` to empty string. If you were relying on the default value, you should set it back to `cortex`. #3660
* [CHANGE] HA Tracker: configured replica label is now verified against label value length limit (`-validation.max-length-label-value`). #3668
* [FEATURE] Querier: Queries can be federated across multiple tenants. The tenants IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` request header. This is an experimental feature, which can be enabled by setting `-tenant-federation.enabled=true` on all Cortex services. #3250
* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers, store-gateways and rulers. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 #3625
* [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier, store-gateway and ruler. When enabled, the querier, store-gateway and ruler will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics are exported by the querier and ruler: #3614 #3625
Expand Down Expand Up @@ -104,6 +105,7 @@
* `cortex_compactor_tenants_processing_failed`
* [ENHANCEMENT] Added new experimental API endpoints: `POST /purger/delete_tenant` and `GET /purger/delete_tenant_status` for deleting all tenant data. Only works with blocks storage. Compactor removes blocks that belong to user marked for deletion. #3549 #3558
* [ENHANCEMENT] Chunks storage: add option to use V2 signatures for S3 authentication. #3560
* [ENHANCEMENT] HA Tracker: Added new limit `ha_max_clusters` to set the max number of clusters tracked for single user. This limit is disabled by default. #3668
* [BUGFIX] Query-Frontend: `cortex_query_seconds_total` now return seconds not nanoseconds. #3589
* [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423
* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3088,6 +3088,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -distributor.ha-tracker.replica
[ha_replica_label: <string> | default = "__replica__"]

# Maximum number of clusters that HA tracker will keep track of for single user.
# 0 to disable the limit.
# CLI flag: -distributor.ha-tracker.max-clusters
[ha_max_clusters: <int> | default = 0]

# This flag can be used to specify label names that to drop during sample
# ingestion within the distributor and can be repeated in order to drop multiple
# labels.
Expand Down
21 changes: 16 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout

replicas, err := newClusterTracker(cfg.HATrackerConfig, reg)
replicas, err := newClusterTracker(cfg.HATrackerConfig, limits, reg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -342,13 +342,18 @@ func removeLabel(labelName string, labels *[]client.LabelAdapter) {
// Returns a boolean that indicates whether or not we want to remove the replica label going forward,
// and an error that indicates whether we want to accept samples based on the cluster/replica found in ts.
// nil for the error means accept the sample.
func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica string) (bool, error) {
func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica string) (removeReplicaLabel bool, _ error) {
// If the sample doesn't have either HA label, accept it.
// At the moment we want to accept these samples by default.
if cluster == "" || replica == "" {
return false, nil
}

// If replica label is too long, don't use it. We accept the sample here, but it will fail validation later anyway.
if len(replica) > d.limits.MaxLabelValueLength(userID) {
return false, nil
}

// At this point we know we have both HA labels, we should lookup
// the cluster/instance here to see if we want to accept this sample.
err := d.HATracker.checkReplica(ctx, userID, cluster, replica)
Expand Down Expand Up @@ -419,13 +424,19 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
removeReplica, err = d.checkSample(ctx, userID, cluster, replica)
if err != nil {
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok && resp.GetCode() == 202 {
// Ensure the request slice is reused if the series get deduped.
client.ReuseSlice(req.Timeseries)

if errors.Is(err, replicasNotMatchError{}) {
// These samples have been deduped.
dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error())
}

// Ensure the request slice is reused if the series get deduped.
client.ReuseSlice(req.Timeseries)
if errors.Is(err, tooManyClustersError{}) {
validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

return nil, err
}
Expand Down
37 changes: 25 additions & 12 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ import (
)

var (
errFail = fmt.Errorf("Fail")
success = &client.WriteResponse{}
ctx = user.InjectOrgID(context.Background(), "user")
errFail = fmt.Errorf("Fail")
emptyResponse = &client.WriteResponse{}
ctx = user.InjectOrgID(context.Background(), "user")
)

func TestConfig_Validate(t *testing.T) {
Expand Down Expand Up @@ -123,14 +123,14 @@ func TestDistributor_Push(t *testing.T) {
"A push of no samples shouldn't block or return error, even if ingesters are sad": {
numIngesters: 3,
happyIngesters: 0,
expectedResponse: success,
expectedResponse: emptyResponse,
},
"A push to 3 happy ingesters should succeed": {
numIngesters: 3,
happyIngesters: 3,
samples: samplesIn{num: 5, startTimestampMs: 123456789000},
metadata: 5,
expectedResponse: success,
expectedResponse: emptyResponse,
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
Expand All @@ -143,7 +143,7 @@ func TestDistributor_Push(t *testing.T) {
happyIngesters: 2,
samples: samplesIn{num: 5, startTimestampMs: 123456789000},
metadata: 5,
expectedResponse: success,
expectedResponse: emptyResponse,
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestDistributor_Push(t *testing.T) {
samples: samplesIn{num: 1, startTimestampMs: 123456789000},
metadata: 0,
metricNames: []string{distributorAppend, distributorAppendFailure},
expectedResponse: success,
expectedResponse: emptyResponse,
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
Expand All @@ -212,7 +212,7 @@ func TestDistributor_Push(t *testing.T) {
samples: samplesIn{num: 0, startTimestampMs: 123456789000},
metadata: 1,
metricNames: []string{distributorAppend, distributorAppendFailure},
expectedResponse: success,
expectedResponse: emptyResponse,
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
response, err := distributors[0].Push(ctx, request)

if push.expectedError == nil {
assert.Equal(t, success, response)
assert.Equal(t, emptyResponse, response)
assert.Nil(t, err)
} else {
assert.Nil(t, response)
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestDistributor_PushHAInstances(t *testing.T) {
testReplica: "instance0",
cluster: "cluster0",
samples: 5,
expectedResponse: success,
expectedResponse: emptyResponse,
},
// The 202 indicates that we didn't accept this sample.
{
Expand All @@ -395,14 +395,25 @@ func TestDistributor_PushHAInstances(t *testing.T) {
testReplica: "instance0",
cluster: "cluster0",
samples: 5,
expectedResponse: success,
expectedResponse: emptyResponse,
},
// Using very long replica label value results in validation error.
{
enableTracker: true,
acceptedReplica: "instance0",
testReplica: "instance1234567890123456789012345678901234567890",
cluster: "cluster0",
samples: 5,
expectedResponse: emptyResponse,
expectedCode: 400,
},
} {
for _, shardByAllLabels := range []bool{true, false} {
t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v)", i, shardByAllLabels), func(t *testing.T) {
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.AcceptHASamples = true
limits.MaxLabelValueLength = 15

ds, _, r := prepare(t, prepConfig{
numIngesters: 3,
Expand All @@ -422,7 +433,7 @@ func TestDistributor_PushHAInstances(t *testing.T) {
KVStore: kv.Config{Mock: mock},
UpdateTimeout: 100 * time.Millisecond,
FailoverTimeout: time.Second,
}, nil)
}, trackerLimits{maxClusters: 100}, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
d.HATracker = r
Expand All @@ -440,6 +451,8 @@ func TestDistributor_PushHAInstances(t *testing.T) {
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
if ok {
assert.Equal(t, tc.expectedCode, httpResp.Code)
} else if tc.expectedCode != 0 {
assert.Fail(t, "expected HTTP status code", tc.expectedCode)
}
})
}
Expand Down
88 changes: 68 additions & 20 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"flag"
"fmt"
"math/rand"
"net/http"
"strings"
"sync"
"time"
Expand All @@ -17,7 +16,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/mtime"

"github.com/cortexproject/cortex/pkg/ingester/client"
Expand Down Expand Up @@ -54,6 +52,12 @@ var (
errInvalidFailoverTimeout = "HA Tracker failover timeout (%v) must be at least 1s greater than update timeout - max jitter (%v)"
)

type haTrackerLimits interface {
// Returns max number of clusters that HA tracker should track for a user.
// Samples from additional clusters are rejected.
MaxHAClusters(user string) int
}

// ProtoReplicaDescFactory makes new InstanceDescs
func ProtoReplicaDescFactory() proto.Message {
return NewReplicaDesc()
Expand All @@ -73,10 +77,11 @@ type haTracker struct {
cfg HATrackerConfig
client kv.Client
updateTimeoutJitter time.Duration
limits haTrackerLimits

// Replicas we are accepting samples from.
electedLock sync.RWMutex
elected map[string]ReplicaDesc
elected map[string]ReplicaDesc // Replicas we are accepting samples from. Key = "user/cluster".
clusters map[string]int // Number of clusters with elected replicas that a single user has. Key = user.
}

// HATrackerConfig contains the configuration require to
Expand Down Expand Up @@ -143,7 +148,7 @@ func GetReplicaDescCodec() codec.Proto {

// NewClusterTracker returns a new HA cluster tracker using either Consul
// or in-memory KV store. Tracker must be started via StartAsync().
func newClusterTracker(cfg HATrackerConfig, reg prometheus.Registerer) (*haTracker, error) {
func newClusterTracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Registerer) (*haTracker, error) {
var jitter time.Duration
if cfg.UpdateTimeoutJitterMax > 0 {
jitter = time.Duration(rand.Int63n(int64(2*cfg.UpdateTimeoutJitterMax))) - cfg.UpdateTimeoutJitterMax
Expand All @@ -153,7 +158,9 @@ func newClusterTracker(cfg HATrackerConfig, reg prometheus.Registerer) (*haTrack
logger: util.Logger,
cfg: cfg,
updateTimeoutJitter: jitter,
limits: limits,
elected: map[string]ReplicaDesc{},
clusters: map[string]int{},
}

if cfg.EnableHATracker {
Expand Down Expand Up @@ -186,19 +193,25 @@ func (c *haTracker) loop(ctx context.Context) error {
replica := value.(*ReplicaDesc)
c.electedLock.Lock()
defer c.electedLock.Unlock()
chunks := strings.SplitN(key, "/", 2)
segments := strings.SplitN(key, "/", 2)

// The prefix has already been stripped, so a valid key would look like cluster/replica,
// and a key without a / such as `ring` would be invalid.
if len(chunks) != 2 {
// Valid key would look like cluster/replica, and a key without a / such as `ring` would be invalid.
if len(segments) != 2 {
return true
}

if replica.Replica != c.elected[key].Replica {
electedReplicaChanges.WithLabelValues(chunks[0], chunks[1]).Inc()
user := segments[0]
cluster := segments[1]

elected, exists := c.elected[key]
if replica.Replica != elected.Replica {
electedReplicaChanges.WithLabelValues(user, cluster).Inc()
}
if !exists {
c.clusters[user]++
}
c.elected[key] = *replica
electedReplicaTimestamp.WithLabelValues(chunks[0], chunks[1]).Set(float64(replica.ReceivedAt / 1000))
electedReplicaTimestamp.WithLabelValues(user, cluster).Set(float64(replica.ReceivedAt / 1000))
electedReplicaPropagationTime.Observe(time.Since(timestamp.Time(replica.ReceivedAt)).Seconds())
return true
})
Expand All @@ -210,7 +223,7 @@ func (c *haTracker) loop(ctx context.Context) error {
// tracker c to see if we should accept the incomming sample. It will return an error if the sample
// should not be accepted. Note that internally this function does checks against the stored values
// and may modify the stored data, for example to failover between replicas after a certain period of time.
// A 202 response code is returned (from checkKVstore) if we shouldn't store this sample but are
// replicasNotMatchError is returned (from checkKVStore) if we shouldn't store this sample but are
// accepting samples from another replica for the cluster, so that there isn't a bunch of error's returned
// to customers clients.
func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica string) error {
Expand All @@ -220,22 +233,32 @@ func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica s
}
key := fmt.Sprintf("%s/%s", userID, cluster)
now := mtime.Now()

c.electedLock.RLock()
entry, ok := c.elected[key]
clusters := c.clusters[userID]
c.electedLock.RUnlock()

if ok && now.Sub(timestamp.Time(entry.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter {
if entry.Replica != replica {
return replicasNotMatchError(replica, entry.Replica)
return replicasNotMatchError{replica: replica, elected: entry.Replica}
}
return nil
}

if !ok {
// If we don't know about this cluster yet and we have reached the limit for number of clusters, we error out now.
if limit := c.limits.MaxHAClusters(userID); limit > 0 && clusters+1 > limit {
return tooManyClustersError{limit: limit}
}
}

err := c.checkKVStore(ctx, key, replica, now)
kvCASCalls.WithLabelValues(userID, cluster).Inc()
if err != nil {
// The callback within checkKVStore will return a 202 if the sample is being deduped,
// The callback within checkKVStore will return a replicasNotMatchError if the sample is being deduped,
// otherwise there may have been an actual error CAS'ing that we should log.
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok && resp.GetCode() != 202 {
if !errors.Is(err, replicasNotMatchError{}) {
level.Error(util.Logger).Log("msg", "rejecting sample", "err", err)
}
}
Expand All @@ -255,8 +278,7 @@ func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now t
// We shouldn't failover to accepting a new replica if the timestamp we've received this sample at
// is less than failOver timeout amount of time since the timestamp in the KV store.
if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.FailoverTimeout {
// Return a 202.
return nil, false, replicasNotMatchError(replica, desc.Replica)
return nil, false, replicasNotMatchError{replica: replica, elected: desc.Replica}
}
}

Expand All @@ -269,8 +291,34 @@ func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now t
})
}

func replicasNotMatchError(replica, elected string) error {
return httpgrpc.Errorf(http.StatusAccepted, "replicas did not mach, rejecting sample: replica=%s, elected=%s", replica, elected)
type replicasNotMatchError struct {
replica, elected string
}

func (e replicasNotMatchError) Error() string {
return fmt.Sprintf("replicas did not mach, rejecting sample: replica=%s, elected=%s", e.replica, e.elected)
}

// Needed for errors.Is to work properly.
func (e replicasNotMatchError) Is(err error) bool {
_, ok1 := err.(replicasNotMatchError)
_, ok2 := err.(*replicasNotMatchError)
return ok1 || ok2
}

type tooManyClustersError struct {
limit int
}

func (e tooManyClustersError) Error() string {
return fmt.Sprintf("too many HA clusters (limit: %d)", e.limit)
}

// Needed for errors.Is to work properly.
func (e tooManyClustersError) Is(err error) bool {
_, ok1 := err.(tooManyClustersError)
_, ok2 := err.(*tooManyClustersError)
return ok1 || ok2
}

func findHALabels(replicaLabel, clusterLabel string, labels []client.LabelAdapter) (string, string) {
Expand Down
Loading