Skip to content

Commit 22f6690

Browse files
authored
Optimize metrics tracking on ingester v2Push() errors (#3969)
* Added benchmark to test v2Push() on errors Signed-off-by: Marco Pracucci <marco@pracucci.com> * Optimized metrics tracking in v2Push() on error Signed-off-by: Marco Pracucci <marco@pracucci.com> * Improved benchmark Signed-off-by: Marco Pracucci <marco@pracucci.com> * Added CHANGELOG entry Signed-off-by: Marco Pracucci <marco@pracucci.com> * Removed benchmark results Signed-off-by: Marco Pracucci <marco@pracucci.com> * Remove if from otherDiscardedReasonsCount Signed-off-by: Marco Pracucci <marco@pracucci.com>
1 parent cdc84ae commit 22f6690

File tree

3 files changed

+227
-68
lines changed

3 files changed

+227
-68
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* `cortex_ruler_clients`
1717
* `cortex_ruler_client_request_duration_seconds`
1818
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
19+
* [ENHANCEMENT] Ingester: reduce CPU and memory when an high number of errors are returned by the ingester on the write path with the blocks storage. #3969
1920
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
2021
* [BUGFIX] Querier: streamline tracing spans. #3924
2122

pkg/ingester/ingester_v2.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -720,9 +720,15 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
720720

721721
// Keep track of some stats which are tracked only if the samples will be
722722
// successfully committed
723-
succeededSamplesCount := 0
724-
failedSamplesCount := 0
725-
startAppend := time.Now()
723+
var (
724+
succeededSamplesCount = 0
725+
failedSamplesCount = 0
726+
startAppend = time.Now()
727+
sampleOutOfBoundsCount = 0
728+
sampleOutOfOrderCount = 0
729+
newValueForTimestampCount = 0
730+
otherDiscardedReasonsCount = map[string]int{}
731+
)
726732

727733
// Walk the samples, appending them to the users database
728734
app := db.Appender(ctx)
@@ -783,11 +789,11 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
783789

784790
switch cause {
785791
case storage.ErrOutOfBounds:
786-
validation.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID).Inc()
792+
sampleOutOfBoundsCount++
787793
case storage.ErrOutOfOrderSample:
788-
validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Inc()
794+
sampleOutOfOrderCount++
789795
case storage.ErrDuplicateSampleForTimestamp:
790-
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Inc()
796+
newValueForTimestampCount++
791797
}
792798

793799
continue
@@ -799,7 +805,7 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
799805
if firstPartialErr == nil {
800806
firstPartialErr = ve
801807
}
802-
validation.DiscardedSamples.WithLabelValues(ve.errorType, userID).Inc()
808+
otherDiscardedReasonsCount[ve.errorType]++
803809
continue
804810
}
805811

@@ -842,6 +848,19 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
842848
i.metrics.ingestedSamples.Add(float64(succeededSamplesCount))
843849
i.metrics.ingestedSamplesFail.Add(float64(failedSamplesCount))
844850

851+
if sampleOutOfBoundsCount > 0 {
852+
validation.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID).Add(float64(sampleOutOfBoundsCount))
853+
}
854+
if sampleOutOfOrderCount > 0 {
855+
validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount))
856+
}
857+
if newValueForTimestampCount > 0 {
858+
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
859+
}
860+
for reason, count := range otherDiscardedReasonsCount {
861+
validation.DiscardedSamples.WithLabelValues(reason, userID).Add(float64(count))
862+
}
863+
845864
switch req.Source {
846865
case cortexpb.RULE:
847866
db.ingestedRuleSamples.add(int64(succeededSamplesCount))

pkg/ingester/ingester_v2_test.go

Lines changed: 200 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -606,70 +606,206 @@ func TestIngester_v2Push_DecreaseInactiveSeries(t *testing.T) {
606606
assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
607607
}
608608

609-
func Benchmark_Ingester_v2PushOnOutOfBoundsSamplesWithHighConcurrency(b *testing.B) {
610-
const (
611-
numSamplesPerRequest = 1000
612-
numRequestsPerClient = 10
613-
numConcurrentClients = 10000
609+
func Benchmark_Ingester_v2PushOnError(b *testing.B) {
610+
var (
611+
ctx = user.InjectOrgID(context.Background(), userID)
612+
sampleTimestamp = int64(100)
613+
metricName = "test"
614614
)
615615

616-
registry := prometheus.NewRegistry()
617-
ctx := user.InjectOrgID(context.Background(), userID)
616+
scenarios := map[string]struct {
617+
numSeriesPerRequest int
618+
numConcurrentClients int
619+
}{
620+
"no concurrency": {
621+
numSeriesPerRequest: 1000,
622+
numConcurrentClients: 1,
623+
},
624+
"low concurrency": {
625+
numSeriesPerRequest: 1000,
626+
numConcurrentClients: 100,
627+
},
628+
"high concurrency": {
629+
numSeriesPerRequest: 1000,
630+
numConcurrentClients: 1000,
631+
},
632+
}
618633

619-
// Create a mocked ingester
620-
cfg := defaultIngesterTestConfig()
621-
cfg.LifecyclerConfig.JoinAfter = 0
634+
tests := map[string]struct {
635+
prepareConfig func(limits *validation.Limits)
636+
beforeBenchmark func(b *testing.B, ingester *Ingester, numSeriesPerRequest int)
637+
runBenchmark func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample)
638+
}{
639+
"out of bound samples": {
640+
prepareConfig: func(limits *validation.Limits) {},
641+
beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) {
642+
// Push a single time series to set the TSDB min time.
643+
currTimeReq := cortexpb.ToWriteRequest(
644+
[]labels.Labels{{{Name: labels.MetricName, Value: metricName}}},
645+
[]cortexpb.Sample{{Value: 1, TimestampMs: util.TimeToMillis(time.Now())}},
646+
nil,
647+
cortexpb.API)
648+
_, err := ingester.v2Push(ctx, currTimeReq)
649+
require.NoError(b, err)
650+
},
651+
runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) {
652+
expectedErr := storage.ErrOutOfBounds.Error()
622653

623-
ingester, err := prepareIngesterWithBlocksStorage(b, cfg, registry)
624-
require.NoError(b, err)
625-
require.NoError(b, services.StartAndAwaitRunning(context.Background(), ingester))
626-
defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck
654+
// Push out of bound samples.
655+
for n := 0; n < b.N; n++ {
656+
_, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) // nolint:errcheck
627657

628-
// Wait until the ingester is ACTIVE
629-
test.Poll(b, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
630-
return ingester.lifecycler.GetState()
631-
})
658+
if !strings.Contains(err.Error(), expectedErr) {
659+
b.Fatalf("unexpected error. expected: %s actual: %s", expectedErr, err.Error())
660+
}
661+
}
662+
},
663+
},
664+
"out of order samples": {
665+
prepareConfig: func(limits *validation.Limits) {},
666+
beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) {
667+
// For each series, push a single sample with a timestamp greater than next pushes.
668+
for i := 0; i < numSeriesPerRequest; i++ {
669+
currTimeReq := cortexpb.ToWriteRequest(
670+
[]labels.Labels{{{Name: labels.MetricName, Value: metricName}, {Name: "cardinality", Value: strconv.Itoa(i)}}},
671+
[]cortexpb.Sample{{Value: 1, TimestampMs: sampleTimestamp + 1}},
672+
nil,
673+
cortexpb.API)
674+
675+
_, err := ingester.v2Push(ctx, currTimeReq)
676+
require.NoError(b, err)
677+
}
678+
},
679+
runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) {
680+
expectedErr := storage.ErrOutOfOrderSample.Error()
632681

633-
// Push a single time series to set the TSDB min time.
634-
metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}}
635-
metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters)
682+
// Push out of order samples.
683+
for n := 0; n < b.N; n++ {
684+
_, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) // nolint:errcheck
636685

637-
currTimeReq := cortexpb.ToWriteRequest(
638-
[]labels.Labels{metricLabels},
639-
[]cortexpb.Sample{{Value: 1, TimestampMs: util.TimeToMillis(time.Now())}},
640-
nil,
641-
cortexpb.API)
642-
_, err = ingester.v2Push(ctx, currTimeReq)
643-
require.NoError(b, err)
686+
if !strings.Contains(err.Error(), expectedErr) {
687+
b.Fatalf("unexpected error. expected: %s actual: %s", expectedErr, err.Error())
688+
}
689+
}
690+
},
691+
},
692+
"per-user series limit reached": {
693+
prepareConfig: func(limits *validation.Limits) {
694+
limits.MaxLocalSeriesPerUser = 1
695+
},
696+
beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) {
697+
// Push a series with a metric name different than the one used during the benchmark.
698+
metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "another"}}
699+
metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters)
644700

645-
// Prepare a request containing out of bound samples.
646-
metrics := make([]labels.Labels, 0, numSamplesPerRequest)
647-
samples := make([]cortexpb.Sample, 0, numSamplesPerRequest)
648-
for i := 0; i < numSamplesPerRequest; i++ {
649-
metrics = append(metrics, metricLabels)
650-
samples = append(samples, cortexpb.Sample{Value: float64(i), TimestampMs: 0})
651-
}
652-
outOfBoundReq := cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)
701+
currTimeReq := cortexpb.ToWriteRequest(
702+
[]labels.Labels{metricLabels},
703+
[]cortexpb.Sample{{Value: 1, TimestampMs: sampleTimestamp + 1}},
704+
nil,
705+
cortexpb.API)
706+
_, err := ingester.v2Push(ctx, currTimeReq)
707+
require.NoError(b, err)
708+
},
709+
runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) {
710+
expectedErr := "per-user series limit"
653711

654-
// Run the benchmark.
655-
wg := sync.WaitGroup{}
656-
wg.Add(numConcurrentClients)
657-
start := make(chan struct{})
712+
// Push series with a different name than the one already pushed.
713+
for n := 0; n < b.N; n++ {
714+
_, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) // nolint:errcheck
658715

659-
for c := 0; c < numConcurrentClients; c++ {
660-
go func() {
661-
defer wg.Done()
662-
<-start
716+
if !strings.Contains(err.Error(), expectedErr) {
717+
b.Fatalf("unexpected error. expected: %s actual: %s", expectedErr, err.Error())
718+
}
719+
}
720+
},
721+
},
722+
"per-metric series limit reached": {
723+
prepareConfig: func(limits *validation.Limits) {
724+
limits.MaxLocalSeriesPerMetric = 1
725+
},
726+
beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) {
727+
// Push a series with the same metric name but different labels than the one used during the benchmark.
728+
metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: metricName}, {Name: "cardinality", Value: "another"}}
729+
metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters)
663730

664-
for n := 0; n < numRequestsPerClient; n++ {
665-
ingester.v2Push(ctx, outOfBoundReq) // nolint:errcheck
666-
}
667-
}()
731+
currTimeReq := cortexpb.ToWriteRequest(
732+
[]labels.Labels{metricLabels},
733+
[]cortexpb.Sample{{Value: 1, TimestampMs: sampleTimestamp + 1}},
734+
nil,
735+
cortexpb.API)
736+
_, err := ingester.v2Push(ctx, currTimeReq)
737+
require.NoError(b, err)
738+
},
739+
runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) {
740+
expectedErr := "per-metric series limit"
741+
742+
// Push series with different labels than the one already pushed.
743+
for n := 0; n < b.N; n++ {
744+
_, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) // nolint:errcheck
745+
746+
if !strings.Contains(err.Error(), expectedErr) {
747+
b.Fatalf("unexpected error. expected: %s actual: %s", expectedErr, err.Error())
748+
}
749+
}
750+
},
751+
},
668752
}
669753

670-
b.ResetTimer()
671-
close(start)
672-
wg.Wait()
754+
for testName, testData := range tests {
755+
for scenarioName, scenario := range scenarios {
756+
b.Run(fmt.Sprintf("failure: %s, scenario: %s", testName, scenarioName), func(b *testing.B) {
757+
registry := prometheus.NewRegistry()
758+
759+
// Create a mocked ingester
760+
cfg := defaultIngesterTestConfig()
761+
cfg.LifecyclerConfig.JoinAfter = 0
762+
763+
limits := defaultLimitsTestConfig()
764+
testData.prepareConfig(&limits)
765+
766+
ingester, err := prepareIngesterWithBlocksStorageAndLimits(b, cfg, limits, "", registry)
767+
require.NoError(b, err)
768+
require.NoError(b, services.StartAndAwaitRunning(context.Background(), ingester))
769+
defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck
770+
771+
// Wait until the ingester is ACTIVE
772+
test.Poll(b, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
773+
return ingester.lifecycler.GetState()
774+
})
775+
776+
testData.beforeBenchmark(b, ingester, scenario.numSeriesPerRequest)
777+
778+
// Prepare the request.
779+
metrics := make([]labels.Labels, 0, scenario.numSeriesPerRequest)
780+
samples := make([]cortexpb.Sample, 0, scenario.numSeriesPerRequest)
781+
for i := 0; i < scenario.numSeriesPerRequest; i++ {
782+
metrics = append(metrics, labels.Labels{{Name: labels.MetricName, Value: metricName}, {Name: "cardinality", Value: strconv.Itoa(i)}})
783+
samples = append(samples, cortexpb.Sample{Value: float64(i), TimestampMs: sampleTimestamp})
784+
}
785+
786+
// Run the benchmark.
787+
wg := sync.WaitGroup{}
788+
wg.Add(scenario.numConcurrentClients)
789+
start := make(chan struct{})
790+
791+
b.ReportAllocs()
792+
b.ResetTimer()
793+
794+
for c := 0; c < scenario.numConcurrentClients; c++ {
795+
go func() {
796+
defer wg.Done()
797+
<-start
798+
799+
testData.runBenchmark(b, ingester, metrics, samples)
800+
}()
801+
}
802+
803+
b.ResetTimer()
804+
close(start)
805+
wg.Wait()
806+
})
807+
}
808+
}
673809
}
674810

675811
func Test_Ingester_v2LabelNames(t *testing.T) {
@@ -1729,19 +1865,22 @@ func mockWriteRequest(t *testing.T, lbls labels.Labels, value float64, timestamp
17291865
}
17301866

17311867
func prepareIngesterWithBlocksStorage(t testing.TB, ingesterCfg Config, registerer prometheus.Registerer) (*Ingester, error) {
1732-
dataDir, err := ioutil.TempDir("", "ingester")
1733-
if err != nil {
1734-
return nil, err
1735-
}
1736-
1737-
t.Cleanup(func() {
1738-
require.NoError(t, os.RemoveAll(dataDir))
1739-
})
1740-
1741-
return prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, defaultLimitsTestConfig(), dataDir, registerer)
1868+
return prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, defaultLimitsTestConfig(), "", registerer)
17421869
}
17431870

17441871
func prepareIngesterWithBlocksStorageAndLimits(t testing.TB, ingesterCfg Config, limits validation.Limits, dataDir string, registerer prometheus.Registerer) (*Ingester, error) {
1872+
// Create a data dir if none has been provided.
1873+
if dataDir == "" {
1874+
var err error
1875+
if dataDir, err = ioutil.TempDir("", "ingester"); err != nil {
1876+
return nil, err
1877+
}
1878+
1879+
t.Cleanup(func() {
1880+
require.NoError(t, os.RemoveAll(dataDir))
1881+
})
1882+
}
1883+
17451884
bucketDir, err := ioutil.TempDir("", "bucket")
17461885
if err != nil {
17471886
return nil, err

0 commit comments

Comments
 (0)