Skip to content

Commit 7159c64

Browse files
committed
Cache user overrides in distributor.Push to reduce lock contention.
Signed-off-by: songjiayang <songjiayang1@gmail.com>
1 parent c37a475 commit 7159c64

File tree

3 files changed

+26
-16
lines changed

3 files changed

+26
-16
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* [ENHANCEMENT] Compactor: Introduced visit marker file for blocks so blocks are under compaction will not be picked up by another compactor. #4805
5050
* [ENHANCEMENT] Distributor: Add label name to labelValueTooLongError. #4855
5151
* [ENHANCEMENT] Enhance traces with hostname information. #4898
52+
* [ENHANCEMENT] Distributor: cache user overrides to reduce lock contention. #4904
5253
* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784
5354
* [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787
5455
* [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818

pkg/distributor/distributor.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -477,15 +477,15 @@ func removeLabel(labelName string, labels *[]cortexpb.LabelAdapter) {
477477
// Returns a boolean that indicates whether or not we want to remove the replica label going forward,
478478
// and an error that indicates whether we want to accept samples based on the cluster/replica found in ts.
479479
// nil for the error means accept the sample.
480-
func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica string) (removeReplicaLabel bool, _ error) {
480+
func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica string, limits *validation.Overrides) (removeReplicaLabel bool, _ error) {
481481
// If the sample doesn't have either HA label, accept it.
482482
// At the moment we want to accept these samples by default.
483483
if cluster == "" || replica == "" {
484484
return false, nil
485485
}
486486

487487
// If replica label is too long, don't use it. We accept the sample here, but it will fail validation later anyway.
488-
if len(replica) > d.limits.MaxLabelValueLength(userID) {
488+
if len(replica) > limits.MaxLabelValueLength(userID) {
489489
return false, nil
490490
}
491491

@@ -503,9 +503,10 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
503503
// any are configured to be dropped for the user ID.
504504
// Returns the validated series with it's labels/samples, and any error.
505505
// The returned error may retain the series labels.
506-
func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool) (cortexpb.PreallocTimeseries, validation.ValidationError) {
506+
func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool, limits *validation.Overrides) (cortexpb.PreallocTimeseries, validation.ValidationError) {
507507
d.labelsHistogram.Observe(float64(len(ts.Labels)))
508-
if err := validation.ValidateLabels(d.limits, userID, ts.Labels, skipLabelNameValidation); err != nil {
508+
509+
if err := validation.ValidateLabels(limits, userID, ts.Labels, skipLabelNameValidation); err != nil {
509510
return emptyPreallocSeries, err
510511
}
511512

@@ -514,7 +515,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
514515
// Only alloc when data present
515516
samples = make([]cortexpb.Sample, 0, len(ts.Samples))
516517
for _, s := range ts.Samples {
517-
if err := validation.ValidateSample(d.limits, userID, ts.Labels, s); err != nil {
518+
if err := validation.ValidateSample(limits, userID, ts.Labels, s); err != nil {
518519
return emptyPreallocSeries, err
519520
}
520521
samples = append(samples, s)
@@ -598,9 +599,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
598599
validatedSamples := 0
599600
validatedExemplars := 0
600601

601-
if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
602-
cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
603-
removeReplica, err = d.checkSample(ctx, userID, cluster, replica)
602+
// Cache user limit with overrides.
603+
limits, _ := validation.NewOverrides(*(d.limits.GetOverridesForUser(userID)), nil)
604+
605+
if limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
606+
cluster, replica := findHALabels(limits.HAReplicaLabel(userID), limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
607+
removeReplica, err = d.checkSample(ctx, userID, cluster, replica, limits)
604608
if err != nil {
605609
// Ensure the request slice is reused if the series get deduped.
606610
cortexpb.ReuseSlice(req.Timeseries)
@@ -634,13 +638,15 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
634638

635639
// For each timeseries, compute a hash to distribute across ingesters;
636640
// check each sample and discard if outside limits.
641+
642+
skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
637643
for _, ts := range req.Timeseries {
638644
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
639645
if len(ts.Samples) > 0 {
640646
latestSampleTimestampMs = util_math.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
641647
}
642648

643-
if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
649+
if mrc := limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
644650
l := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
645651
if len(l) == 0 {
646652
// all labels are gone, samples will be discarded
@@ -657,10 +663,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
657663
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
658664
// series we're trying to dedupe when HA tracking moves over to a different replica.
659665
if removeReplica {
660-
removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
666+
removeLabel(limits.HAReplicaLabel(userID), &ts.Labels)
661667
}
662668

663-
for _, labelName := range d.limits.DropLabels(userID) {
669+
for _, labelName := range limits.DropLabels(userID) {
664670
removeLabel(labelName, &ts.Labels)
665671
}
666672

@@ -686,9 +692,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
686692
if err != nil {
687693
return nil, err
688694
}
689-
690-
skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
691-
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation)
695+
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits)
692696

693697
// Errors in validation are considered non-fatal, as one series in a request may contain
694698
// invalid data but all the remaining series could be perfectly valid.
@@ -710,7 +714,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
710714
}
711715

712716
for _, m := range req.Metadata {
713-
err := validation.ValidateMetadata(d.limits, userID, m)
717+
err := validation.ValidateMetadata(limits, userID, m)
714718

715719
if err != nil {
716720
if firstPartialErr == nil {
@@ -756,7 +760,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
756760

757761
// Obtain a subring if required.
758762
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
759-
subRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))
763+
subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize(userID))
760764
}
761765

762766
keys := append(seriesKeys, metadataKeys...)

pkg/util/validation/limits.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,11 @@ func (o *Overrides) AlertmanagerMaxAlertsSizeBytes(userID string) int {
625625
return o.getOverridesForUser(userID).AlertmanagerMaxAlertsSizeBytes
626626
}
627627

628+
// GetOverridesForUser returns the per-tenant limits with overrides.
629+
func (o *Overrides) GetOverridesForUser(userID string) *Limits {
630+
return o.getOverridesForUser(userID)
631+
}
632+
628633
func (o *Overrides) getOverridesForUser(userID string) *Limits {
629634
if o.tenantLimits != nil {
630635
l := o.tenantLimits.ByUserID(userID)

0 commit comments

Comments
 (0)