Skip to content

Commit 15d98ef

Browse files
authored
HA tracker limits (#3668)
Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
1 parent 2b174a5 commit 15d98ef

File tree

8 files changed

+217
-44
lines changed

8 files changed

+217
-44
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* [CHANGE] Blocks storage: compactor is now required when running a Cortex cluster with the blocks storage, because it also keeps the bucket index updated. #3583
1515
* [CHANGE] Blocks storage: block deletion marks are now stored in a per-tenant global markers/ location too, other than within the block location. The compactor, at startup, will copy deletion marks from the block location to the global location. This migration is required only once, so you can safely disable it via `-compactor.block-deletion-marks-migration-enabled=false` once new compactor has successfully started once in your cluster. #3583
1616
* [CHANGE] OpenStack Swift: the default value for the `-ruler.storage.swift.container-name` and `-swift.container-name` config options has changed from `cortex` to empty string. If you were relying on the default value, you should set it back to `cortex`. #3660
17+
* [CHANGE] HA Tracker: configured replica label is now verified against label value length limit (`-validation.max-length-label-value`). #3668
1718
* [FEATURE] Querier: Queries can be federated across multiple tenants. The tenants IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` request header. This is an experimental feature, which can be enabled by setting `-tenant-federation.enabled=true` on all Cortex services. #3250
1819
* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers, store-gateways and rulers. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 #3625
1920
* [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier, store-gateway and ruler. When enabled, the querier, store-gateway and ruler will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics are exported by the querier and ruler: #3614 #3625
@@ -104,6 +105,7 @@
104105
* `cortex_compactor_tenants_processing_failed`
105106
* [ENHANCEMENT] Added new experimental API endpoints: `POST /purger/delete_tenant` and `GET /purger/delete_tenant_status` for deleting all tenant data. Only works with blocks storage. Compactor removes blocks that belong to user marked for deletion. #3549 #3558
106107
* [ENHANCEMENT] Chunks storage: add option to use V2 signatures for S3 authentication. #3560
108+
* [ENHANCEMENT] HA Tracker: Added new limit `ha_max_clusters` to set the max number of clusters tracked for single user. This limit is disabled by default. #3668
107109
* [BUGFIX] Query-Frontend: `cortex_query_seconds_total` now return seconds not nanoseconds. #3589
108110
* [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423
109111
* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3088,6 +3088,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
30883088
# CLI flag: -distributor.ha-tracker.replica
30893089
[ha_replica_label: <string> | default = "__replica__"]
30903090
3091+
# Maximum number of clusters that HA tracker will keep track of for single user.
3092+
# 0 to disable the limit.
3093+
# CLI flag: -distributor.ha-tracker.max-clusters
3094+
[ha_max_clusters: <int> | default = 0]
3095+
30913096
# This flag can be used to specify label names that to drop during sample
30923097
# ingestion within the distributor and can be repeated in order to drop multiple
30933098
# labels.

pkg/distributor/distributor.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
216216
replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
217217
cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout
218218

219-
replicas, err := newClusterTracker(cfg.HATrackerConfig, reg)
219+
replicas, err := newClusterTracker(cfg.HATrackerConfig, limits, reg)
220220
if err != nil {
221221
return nil, err
222222
}
@@ -342,13 +342,18 @@ func removeLabel(labelName string, labels *[]client.LabelAdapter) {
342342
// Returns a boolean that indicates whether or not we want to remove the replica label going forward,
343343
// and an error that indicates whether we want to accept samples based on the cluster/replica found in ts.
344344
// nil for the error means accept the sample.
345-
func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica string) (bool, error) {
345+
func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica string) (removeReplicaLabel bool, _ error) {
346346
// If the sample doesn't have either HA label, accept it.
347347
// At the moment we want to accept these samples by default.
348348
if cluster == "" || replica == "" {
349349
return false, nil
350350
}
351351

352+
// If replica label is too long, don't use it. We accept the sample here, but it will fail validation later anyway.
353+
if len(replica) > d.limits.MaxLabelValueLength(userID) {
354+
return false, nil
355+
}
356+
352357
// At this point we know we have both HA labels, we should lookup
353358
// the cluster/instance here to see if we want to accept this sample.
354359
err := d.HATracker.checkReplica(ctx, userID, cluster, replica)
@@ -419,13 +424,19 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
419424
cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
420425
removeReplica, err = d.checkSample(ctx, userID, cluster, replica)
421426
if err != nil {
422-
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok && resp.GetCode() == 202 {
427+
// Ensure the request slice is reused if the series get deduped.
428+
client.ReuseSlice(req.Timeseries)
429+
430+
if errors.Is(err, replicasNotMatchError{}) {
423431
// These samples have been deduped.
424432
dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
433+
return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error())
425434
}
426435

427-
// Ensure the request slice is reused if the series get deduped.
428-
client.ReuseSlice(req.Timeseries)
436+
if errors.Is(err, tooManyClustersError{}) {
437+
validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
438+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
439+
}
429440

430441
return nil, err
431442
}

pkg/distributor/distributor_test.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ import (
4343
)
4444

4545
var (
46-
errFail = fmt.Errorf("Fail")
47-
success = &client.WriteResponse{}
48-
ctx = user.InjectOrgID(context.Background(), "user")
46+
errFail = fmt.Errorf("Fail")
47+
emptyResponse = &client.WriteResponse{}
48+
ctx = user.InjectOrgID(context.Background(), "user")
4949
)
5050

5151
func TestConfig_Validate(t *testing.T) {
@@ -123,14 +123,14 @@ func TestDistributor_Push(t *testing.T) {
123123
"A push of no samples shouldn't block or return error, even if ingesters are sad": {
124124
numIngesters: 3,
125125
happyIngesters: 0,
126-
expectedResponse: success,
126+
expectedResponse: emptyResponse,
127127
},
128128
"A push to 3 happy ingesters should succeed": {
129129
numIngesters: 3,
130130
happyIngesters: 3,
131131
samples: samplesIn{num: 5, startTimestampMs: 123456789000},
132132
metadata: 5,
133-
expectedResponse: success,
133+
expectedResponse: emptyResponse,
134134
metricNames: []string{lastSeenTimestamp},
135135
expectedMetrics: `
136136
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
@@ -143,7 +143,7 @@ func TestDistributor_Push(t *testing.T) {
143143
happyIngesters: 2,
144144
samples: samplesIn{num: 5, startTimestampMs: 123456789000},
145145
metadata: 5,
146-
expectedResponse: success,
146+
expectedResponse: emptyResponse,
147147
metricNames: []string{lastSeenTimestamp},
148148
expectedMetrics: `
149149
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
@@ -194,7 +194,7 @@ func TestDistributor_Push(t *testing.T) {
194194
samples: samplesIn{num: 1, startTimestampMs: 123456789000},
195195
metadata: 0,
196196
metricNames: []string{distributorAppend, distributorAppendFailure},
197-
expectedResponse: success,
197+
expectedResponse: emptyResponse,
198198
expectedMetrics: `
199199
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
200200
# TYPE cortex_distributor_ingester_append_failures_total counter
@@ -212,7 +212,7 @@ func TestDistributor_Push(t *testing.T) {
212212
samples: samplesIn{num: 0, startTimestampMs: 123456789000},
213213
metadata: 1,
214214
metricNames: []string{distributorAppend, distributorAppendFailure},
215-
expectedResponse: success,
215+
expectedResponse: emptyResponse,
216216
expectedMetrics: `
217217
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
218218
# TYPE cortex_distributor_ingester_append_failures_total counter
@@ -348,7 +348,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
348348
response, err := distributors[0].Push(ctx, request)
349349

350350
if push.expectedError == nil {
351-
assert.Equal(t, success, response)
351+
assert.Equal(t, emptyResponse, response)
352352
assert.Nil(t, err)
353353
} else {
354354
assert.Nil(t, response)
@@ -377,7 +377,7 @@ func TestDistributor_PushHAInstances(t *testing.T) {
377377
testReplica: "instance0",
378378
cluster: "cluster0",
379379
samples: 5,
380-
expectedResponse: success,
380+
expectedResponse: emptyResponse,
381381
},
382382
// The 202 indicates that we didn't accept this sample.
383383
{
@@ -395,14 +395,25 @@ func TestDistributor_PushHAInstances(t *testing.T) {
395395
testReplica: "instance0",
396396
cluster: "cluster0",
397397
samples: 5,
398-
expectedResponse: success,
398+
expectedResponse: emptyResponse,
399+
},
400+
// Using very long replica label value results in validation error.
401+
{
402+
enableTracker: true,
403+
acceptedReplica: "instance0",
404+
testReplica: "instance1234567890123456789012345678901234567890",
405+
cluster: "cluster0",
406+
samples: 5,
407+
expectedResponse: emptyResponse,
408+
expectedCode: 400,
399409
},
400410
} {
401411
for _, shardByAllLabels := range []bool{true, false} {
402412
t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v)", i, shardByAllLabels), func(t *testing.T) {
403413
var limits validation.Limits
404414
flagext.DefaultValues(&limits)
405415
limits.AcceptHASamples = true
416+
limits.MaxLabelValueLength = 15
406417

407418
ds, _, r := prepare(t, prepConfig{
408419
numIngesters: 3,
@@ -422,7 +433,7 @@ func TestDistributor_PushHAInstances(t *testing.T) {
422433
KVStore: kv.Config{Mock: mock},
423434
UpdateTimeout: 100 * time.Millisecond,
424435
FailoverTimeout: time.Second,
425-
}, nil)
436+
}, trackerLimits{maxClusters: 100}, nil)
426437
require.NoError(t, err)
427438
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
428439
d.HATracker = r
@@ -440,6 +451,8 @@ func TestDistributor_PushHAInstances(t *testing.T) {
440451
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
441452
if ok {
442453
assert.Equal(t, tc.expectedCode, httpResp.Code)
454+
} else if tc.expectedCode != 0 {
455+
assert.Fail(t, "expected HTTP status code", tc.expectedCode)
443456
}
444457
})
445458
}

pkg/distributor/ha_tracker.go

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"flag"
77
"fmt"
88
"math/rand"
9-
"net/http"
109
"strings"
1110
"sync"
1211
"time"
@@ -17,7 +16,6 @@ import (
1716
"github.com/prometheus/client_golang/prometheus"
1817
"github.com/prometheus/client_golang/prometheus/promauto"
1918
"github.com/prometheus/prometheus/pkg/timestamp"
20-
"github.com/weaveworks/common/httpgrpc"
2119
"github.com/weaveworks/common/mtime"
2220

2321
"github.com/cortexproject/cortex/pkg/ingester/client"
@@ -54,6 +52,12 @@ var (
5452
errInvalidFailoverTimeout = "HA Tracker failover timeout (%v) must be at least 1s greater than update timeout - max jitter (%v)"
5553
)
5654

55+
type haTrackerLimits interface {
56+
// Returns max number of clusters that HA tracker should track for a user.
57+
// Samples from additional clusters are rejected.
58+
MaxHAClusters(user string) int
59+
}
60+
5761
// ProtoReplicaDescFactory makes new InstanceDescs
5862
func ProtoReplicaDescFactory() proto.Message {
5963
return NewReplicaDesc()
@@ -73,10 +77,11 @@ type haTracker struct {
7377
cfg HATrackerConfig
7478
client kv.Client
7579
updateTimeoutJitter time.Duration
80+
limits haTrackerLimits
7681

77-
// Replicas we are accepting samples from.
7882
electedLock sync.RWMutex
79-
elected map[string]ReplicaDesc
83+
elected map[string]ReplicaDesc // Replicas we are accepting samples from. Key = "user/cluster".
84+
clusters map[string]int // Number of clusters with elected replicas that a single user has. Key = user.
8085
}
8186

8287
// HATrackerConfig contains the configuration require to
@@ -143,7 +148,7 @@ func GetReplicaDescCodec() codec.Proto {
143148

144149
// NewClusterTracker returns a new HA cluster tracker using either Consul
145150
// or in-memory KV store. Tracker must be started via StartAsync().
146-
func newClusterTracker(cfg HATrackerConfig, reg prometheus.Registerer) (*haTracker, error) {
151+
func newClusterTracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Registerer) (*haTracker, error) {
147152
var jitter time.Duration
148153
if cfg.UpdateTimeoutJitterMax > 0 {
149154
jitter = time.Duration(rand.Int63n(int64(2*cfg.UpdateTimeoutJitterMax))) - cfg.UpdateTimeoutJitterMax
@@ -153,7 +158,9 @@ func newClusterTracker(cfg HATrackerConfig, reg prometheus.Registerer) (*haTrack
153158
logger: util.Logger,
154159
cfg: cfg,
155160
updateTimeoutJitter: jitter,
161+
limits: limits,
156162
elected: map[string]ReplicaDesc{},
163+
clusters: map[string]int{},
157164
}
158165

159166
if cfg.EnableHATracker {
@@ -186,19 +193,25 @@ func (c *haTracker) loop(ctx context.Context) error {
186193
replica := value.(*ReplicaDesc)
187194
c.electedLock.Lock()
188195
defer c.electedLock.Unlock()
189-
chunks := strings.SplitN(key, "/", 2)
196+
segments := strings.SplitN(key, "/", 2)
190197

191-
// The prefix has already been stripped, so a valid key would look like cluster/replica,
192-
// and a key without a / such as `ring` would be invalid.
193-
if len(chunks) != 2 {
198+
// Valid key would look like cluster/replica, and a key without a / such as `ring` would be invalid.
199+
if len(segments) != 2 {
194200
return true
195201
}
196202

197-
if replica.Replica != c.elected[key].Replica {
198-
electedReplicaChanges.WithLabelValues(chunks[0], chunks[1]).Inc()
203+
user := segments[0]
204+
cluster := segments[1]
205+
206+
elected, exists := c.elected[key]
207+
if replica.Replica != elected.Replica {
208+
electedReplicaChanges.WithLabelValues(user, cluster).Inc()
209+
}
210+
if !exists {
211+
c.clusters[user]++
199212
}
200213
c.elected[key] = *replica
201-
electedReplicaTimestamp.WithLabelValues(chunks[0], chunks[1]).Set(float64(replica.ReceivedAt / 1000))
214+
electedReplicaTimestamp.WithLabelValues(user, cluster).Set(float64(replica.ReceivedAt / 1000))
202215
electedReplicaPropagationTime.Observe(time.Since(timestamp.Time(replica.ReceivedAt)).Seconds())
203216
return true
204217
})
@@ -210,7 +223,7 @@ func (c *haTracker) loop(ctx context.Context) error {
210223
// tracker c to see if we should accept the incomming sample. It will return an error if the sample
211224
// should not be accepted. Note that internally this function does checks against the stored values
212225
// and may modify the stored data, for example to failover between replicas after a certain period of time.
213-
// A 202 response code is returned (from checkKVstore) if we shouldn't store this sample but are
226+
// replicasNotMatchError is returned (from checkKVStore) if we shouldn't store this sample but are
214227
// accepting samples from another replica for the cluster, so that there isn't a bunch of error's returned
215228
// to customers clients.
216229
func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica string) error {
@@ -220,22 +233,32 @@ func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica s
220233
}
221234
key := fmt.Sprintf("%s/%s", userID, cluster)
222235
now := mtime.Now()
236+
223237
c.electedLock.RLock()
224238
entry, ok := c.elected[key]
239+
clusters := c.clusters[userID]
225240
c.electedLock.RUnlock()
241+
226242
if ok && now.Sub(timestamp.Time(entry.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter {
227243
if entry.Replica != replica {
228-
return replicasNotMatchError(replica, entry.Replica)
244+
return replicasNotMatchError{replica: replica, elected: entry.Replica}
229245
}
230246
return nil
231247
}
232248

249+
if !ok {
250+
// If we don't know about this cluster yet and we have reached the limit for number of clusters, we error out now.
251+
if limit := c.limits.MaxHAClusters(userID); limit > 0 && clusters+1 > limit {
252+
return tooManyClustersError{limit: limit}
253+
}
254+
}
255+
233256
err := c.checkKVStore(ctx, key, replica, now)
234257
kvCASCalls.WithLabelValues(userID, cluster).Inc()
235258
if err != nil {
236-
// The callback within checkKVStore will return a 202 if the sample is being deduped,
259+
// The callback within checkKVStore will return a replicasNotMatchError if the sample is being deduped,
237260
// otherwise there may have been an actual error CAS'ing that we should log.
238-
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok && resp.GetCode() != 202 {
261+
if !errors.Is(err, replicasNotMatchError{}) {
239262
level.Error(util.Logger).Log("msg", "rejecting sample", "err", err)
240263
}
241264
}
@@ -255,8 +278,7 @@ func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now t
255278
// We shouldn't failover to accepting a new replica if the timestamp we've received this sample at
256279
// is less than failOver timeout amount of time since the timestamp in the KV store.
257280
if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.FailoverTimeout {
258-
// Return a 202.
259-
return nil, false, replicasNotMatchError(replica, desc.Replica)
281+
return nil, false, replicasNotMatchError{replica: replica, elected: desc.Replica}
260282
}
261283
}
262284

@@ -269,8 +291,34 @@ func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now t
269291
})
270292
}
271293

272-
func replicasNotMatchError(replica, elected string) error {
273-
return httpgrpc.Errorf(http.StatusAccepted, "replicas did not mach, rejecting sample: replica=%s, elected=%s", replica, elected)
294+
type replicasNotMatchError struct {
295+
replica, elected string
296+
}
297+
298+
func (e replicasNotMatchError) Error() string {
299+
return fmt.Sprintf("replicas did not mach, rejecting sample: replica=%s, elected=%s", e.replica, e.elected)
300+
}
301+
302+
// Needed for errors.Is to work properly.
303+
func (e replicasNotMatchError) Is(err error) bool {
304+
_, ok1 := err.(replicasNotMatchError)
305+
_, ok2 := err.(*replicasNotMatchError)
306+
return ok1 || ok2
307+
}
308+
309+
type tooManyClustersError struct {
310+
limit int
311+
}
312+
313+
func (e tooManyClustersError) Error() string {
314+
return fmt.Sprintf("too many HA clusters (limit: %d)", e.limit)
315+
}
316+
317+
// Needed for errors.Is to work properly.
318+
func (e tooManyClustersError) Is(err error) bool {
319+
_, ok1 := err.(tooManyClustersError)
320+
_, ok2 := err.(*tooManyClustersError)
321+
return ok1 || ok2
274322
}
275323

276324
func findHALabels(replicaLabel, clusterLabel string, labels []client.LabelAdapter) (string, string) {

0 commit comments

Comments
 (0)