@@ -456,8 +456,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
456
456
now := time .Now ()
457
457
458
458
if block , until , retStatusCode := d .validator .ShouldBlockIngestion (validationContext , now ); block {
459
- validation .DiscardedSamples .WithLabelValues (validation .BlockedIngestion , tenantID ).Add (float64 (validatedLineCount ))
460
- validation .DiscardedBytes .WithLabelValues (validation .BlockedIngestion , tenantID ).Add (float64 (validatedLineSize ))
459
+ d .trackDiscardedData (ctx , req , validationContext , tenantID , validatedLineCount , validatedLineSize , validation .BlockedIngestion )
461
460
462
461
err = fmt .Errorf (validation .BlockedIngestionErrorMsg , tenantID , until .Format (time .RFC3339 ), retStatusCode )
463
462
d .writeFailuresManager .Log (tenantID , err )
@@ -472,30 +471,11 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
472
471
}
473
472
474
473
if ! d .ingestionRateLimiter .AllowN (now , tenantID , validatedLineSize ) {
475
- // Return a 429 to indicate to the client they are being rate limited
476
- validation .DiscardedSamples .WithLabelValues (validation .RateLimited , tenantID ).Add (float64 (validatedLineCount ))
477
- validation .DiscardedBytes .WithLabelValues (validation .RateLimited , tenantID ).Add (float64 (validatedLineSize ))
478
-
479
- if d .usageTracker != nil {
480
- for _ , stream := range req .Streams {
481
- lbs , _ , _ , err := d .parseStreamLabels (validationContext , stream .Labels , stream )
482
- if err != nil {
483
- continue
484
- }
485
-
486
- discardedStreamBytes := 0
487
- for _ , e := range stream .Entries {
488
- discardedStreamBytes += len (e .Line )
489
- }
490
-
491
- if d .usageTracker != nil {
492
- d .usageTracker .DiscardedBytesAdd (ctx , tenantID , validation .RateLimited , lbs , float64 (discardedStreamBytes ))
493
- }
494
- }
495
- }
474
+ d .trackDiscardedData (ctx , req , validationContext , tenantID , validatedLineCount , validatedLineSize , validation .RateLimited )
496
475
497
476
err = fmt .Errorf (validation .RateLimitedErrorMsg , tenantID , int (d .ingestionRateLimiter .Limit (now , tenantID )), validatedLineCount , validatedLineSize )
498
477
d .writeFailuresManager .Log (tenantID , err )
478
+ // Return a 429 to indicate to the client they are being rate limited
499
479
return nil , httpgrpc .Errorf (http .StatusTooManyRequests , "%s" , err .Error ())
500
480
}
501
481
@@ -569,6 +549,37 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
569
549
}
570
550
}
571
551
552
+ func (d * Distributor ) trackDiscardedData (
553
+ ctx context.Context ,
554
+ req * logproto.PushRequest ,
555
+ validationContext validationContext ,
556
+ tenantID string ,
557
+ validatedLineCount int ,
558
+ validatedLineSize int ,
559
+ reason string ,
560
+ ) {
561
+ validation .DiscardedSamples .WithLabelValues (reason , tenantID ).Add (float64 (validatedLineCount ))
562
+ validation .DiscardedBytes .WithLabelValues (reason , tenantID ).Add (float64 (validatedLineSize ))
563
+
564
+ if d .usageTracker != nil {
565
+ for _ , stream := range req .Streams {
566
+ lbs , _ , _ , err := d .parseStreamLabels (validationContext , stream .Labels , stream )
567
+ if err != nil {
568
+ continue
569
+ }
570
+
571
+ discardedStreamBytes := 0
572
+ for _ , e := range stream .Entries {
573
+ discardedStreamBytes += len (e .Line )
574
+ }
575
+
576
+ if d .usageTracker != nil {
577
+ d .usageTracker .DiscardedBytesAdd (ctx , tenantID , reason , lbs , float64 (discardedStreamBytes ))
578
+ }
579
+ }
580
+ }
581
+ }
582
+
572
583
func hasAnyLevelLabels (l labels.Labels ) (string , bool ) {
573
584
for lbl := range allowedLabelsForLevel {
574
585
if l .Has (lbl ) {
0 commit comments