Skip to content

Commit 6497984

Browse files
authored
ReuseSlice missing in distributor push (#7123)
Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com>
1 parent d56bbea commit 6497984

File tree

1 file changed

+11
-10
lines changed

1 file changed

+11
-10
lines changed

pkg/distributor/distributor.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,14 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
706706

707707
// Push implements client.IngesterServer
708708
func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
709+
var validationError = true
710+
defer func() {
711+
if validationError {
712+
cortexpb.ReuseSlice(req.Timeseries)
713+
req.Free()
714+
}
715+
}()
716+
709717
userID, err := tenant.TenantID(ctx)
710718
if err != nil {
711719
return nil, err
@@ -760,9 +768,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
760768
cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, req.Timeseries[0].Labels)
761769
removeReplica, err = d.checkSample(ctx, userID, cluster, replica, limits)
762770
if err != nil {
763-
// Ensure the request slice is reused if the series get deduped.
764-
cortexpb.ReuseSlice(req.Timeseries)
765-
766771
if errors.Is(err, ha.ReplicasNotMatchError{}) {
767772
// These samples have been deduped.
768773
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numFloatSamples + numHistogramSamples))
@@ -773,7 +778,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
773778
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numFloatSamples + numHistogramSamples))
774779
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
775780
}
776-
777781
return nil, err
778782
}
779783
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
@@ -795,18 +799,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
795799
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))
796800

797801
if len(seriesKeys) == 0 && len(nhSeriesKeys) == 0 && len(metadataKeys) == 0 {
798-
// Ensure the request slice is reused if there's no series or metadata passing the validation.
799-
cortexpb.ReuseSlice(req.Timeseries)
800-
801802
return &cortexpb.WriteResponse{}, firstPartialErr
802803
}
803804

804805
totalSamples := validatedFloatSamples + validatedHistogramSamples
805806
totalN := totalSamples + validatedExemplars + len(validatedMetadata)
806807
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
807-
// Ensure the request slice is reused if the request is rate limited.
808-
cortexpb.ReuseSlice(req.Timeseries)
809-
810808
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples))
811809
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
812810
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
@@ -844,6 +842,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
844842
return nil, nativeHistogramErr
845843
}
846844

845+
//DoBatch will be responsible to call cleanup after all async ingester requests finish.
846+
validationError = false
847+
847848
err = d.doBatch(ctx, req, subRing, keys, initialMetadataIndex, validatedMetadata, validatedTimeseries, userID)
848849
if err != nil {
849850
return nil, err

0 commit comments

Comments
 (0)