Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,14 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri

// Push implements client.IngesterServer
func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
var validationError = true
defer func() {
if validationError {
cortexpb.ReuseSlice(req.Timeseries)
req.Free()
}
}()

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -760,9 +768,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, req.Timeseries[0].Labels)
removeReplica, err = d.checkSample(ctx, userID, cluster, replica, limits)
if err != nil {
// Ensure the request slice is reused if the series get deduped.
cortexpb.ReuseSlice(req.Timeseries)

if errors.Is(err, ha.ReplicasNotMatchError{}) {
// These samples have been deduped.
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numFloatSamples + numHistogramSamples))
Expand All @@ -773,7 +778,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numFloatSamples + numHistogramSamples))
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}

return nil, err
}
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
Expand All @@ -795,18 +799,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))

if len(seriesKeys) == 0 && len(nhSeriesKeys) == 0 && len(metadataKeys) == 0 {
// Ensure the request slice is reused if there's no series or metadata passing the validation.
cortexpb.ReuseSlice(req.Timeseries)

return &cortexpb.WriteResponse{}, firstPartialErr
}

totalSamples := validatedFloatSamples + validatedHistogramSamples
totalN := totalSamples + validatedExemplars + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
// Ensure the request slice is reused if the request is rate limited.
cortexpb.ReuseSlice(req.Timeseries)

d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples))
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
Expand Down Expand Up @@ -844,6 +842,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return nil, nativeHistogramErr
}

//DoBatch will be responsible to call cleanup after all async ingester requests finish.
validationError = false

err = d.doBatch(ctx, req, subRing, keys, initialMetadataIndex, validatedMetadata, validatedTimeseries, userID)
if err != nil {
return nil, err
Expand Down
Loading