Skip to content

Optimize distributor push on error #3990

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
* [ENHANCEMENT] Query-frontend: reduced memory allocations when serializing query response. #3964
* [ENHANCEMENT] Ingester: reduce CPU and memory when an high number of errors are returned by the ingester on the write path with the blocks storage. #3969 #3971 #3973
* [ENHANCEMENT] Distributor: reduce CPU and memory when an high number of errors are returned by the distributor on the write path. #3990
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
* [BUGFIX] Querier: streamline tracing spans. #3924
Expand Down
8 changes: 4 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
// Validates a single series from a write request. Will remove labels if
// any are configured to be dropped for the user ID.
// Returns the validated series with it's labels/samples, and any error.
func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool) (cortexpb.PreallocTimeseries, error) {
func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool) (cortexpb.PreallocTimeseries, validation.ValidationError) {
d.labelsHistogram.Observe(float64(len(ts.Labels)))
if err := validation.ValidateLabels(d.limits, userID, ts.Labels, skipLabelNameValidation); err != nil {
return emptyPreallocSeries, err
Expand Down Expand Up @@ -544,12 +544,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}

skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
validatedSeries, err := d.validateSeries(ts, userID, skipLabelNameValidation)
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation)

// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
if err != nil && firstPartialErr == nil {
firstPartialErr = err
if validationErr != nil && firstPartialErr == nil {
firstPartialErr = httpgrpc.Errorf(http.StatusBadRequest, validationErr.Error())
}

// validateSeries would have returned an emptyPreallocSeries if there were no valid samples.
Expand Down
255 changes: 255 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,247 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) {
}
}

func BenchmarkDistributor_PushOnError(b *testing.B) {
const (
numSeriesPerRequest = 1000
)

tests := map[string]struct {
prepareConfig func(limits *validation.Limits)
prepareSeries func() ([]labels.Labels, []cortexpb.Sample)
expectedErr string
}{
"ingestion rate limit reached": {
prepareConfig: func(limits *validation.Limits) {
limits.IngestionRate = 1
limits.IngestionBurstSize = 1
},
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpb.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

metrics[i] = lbls.Labels()
samples[i] = cortexpb.Sample{
Value: float64(i),
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "ingestion rate limit",
},
"too many labels limit reached": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxLabelNamesPerSeries = 30
},
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpb.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 1; i < 31; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

metrics[i] = lbls.Labels()
samples[i] = cortexpb.Sample{
Value: float64(i),
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "series has too many labels",
},
"max label name length limit reached": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxLabelNameLength = 1024
},
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpb.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

// Add a label with a very long name.
lbls.Set(fmt.Sprintf("xxx_%0.2000d", 1), "xxx")

metrics[i] = lbls.Labels()
samples[i] = cortexpb.Sample{
Value: float64(i),
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "label name too long",
},
"max label value length limit reached": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxLabelValueLength = 1024
},
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpb.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

// Add a label with a very long value.
lbls.Set("xxx", fmt.Sprintf("xxx_%0.2000d", 1))

metrics[i] = lbls.Labels()
samples[i] = cortexpb.Sample{
Value: float64(i),
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "label value too long",
},
"timestamp too old": {
prepareConfig: func(limits *validation.Limits) {
limits.RejectOldSamples = true
limits.RejectOldSamplesMaxAge = time.Hour
},
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpb.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

metrics[i] = lbls.Labels()
samples[i] = cortexpb.Sample{
Value: float64(i),
TimestampMs: time.Now().Add(-2*time.Hour).UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "timestamp too old",
},
"timestamp too new": {
prepareConfig: func(limits *validation.Limits) {
limits.CreationGracePeriod = time.Minute
},
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpb.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

metrics[i] = lbls.Labels()
samples[i] = cortexpb.Sample{
Value: float64(i),
TimestampMs: time.Now().Add(time.Hour).UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "timestamp too new",
},
}

for testName, testData := range tests {
b.Run(testName, func(b *testing.B) {
// Create an in-memory KV store for the ring with 1 ingester registered.
kvStore := consul.NewInMemoryClient(ring.GetCodec())
err := kvStore.CAS(context.Background(), ring.IngesterRingKey,
func(_ interface{}) (interface{}, bool, error) {
d := &ring.Desc{}
d.AddIngester("ingester-1", "127.0.0.1", "", ring.GenerateTokens(128, nil), ring.ACTIVE, time.Now())
return d, true, nil
},
)
require.NoError(b, err)

ingestersRing, err := ring.New(ring.Config{
KVStore: kv.Config{Mock: kvStore},
HeartbeatTimeout: 60 * time.Minute,
ReplicationFactor: 1,
}, ring.IngesterRingKey, ring.IngesterRingKey, nil)
require.NoError(b, err)
require.NoError(b, services.StartAndAwaitRunning(context.Background(), ingestersRing))
b.Cleanup(func() {
require.NoError(b, services.StopAndAwaitTerminated(context.Background(), ingestersRing))
})

test.Poll(b, time.Second, 1, func() interface{} {
return ingestersRing.InstancesCount()
})

// Prepare the distributor configuration.
var distributorCfg Config
var clientConfig client.Config
limits := validation.Limits{}
flagext.DefaultValues(&distributorCfg, &clientConfig, &limits)

limits.IngestionRate = 0 // Unlimited.
testData.prepareConfig(&limits)

distributorCfg.ShardByAllLabels = true
distributorCfg.IngesterClientFactory = func(addr string) (ring_client.PoolClient, error) {
return &noopIngester{}, nil
}

overrides, err := validation.NewOverrides(limits, nil)
require.NoError(b, err)

// Start the distributor.
distributor, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true, nil, log.NewNopLogger())
require.NoError(b, err)
require.NoError(b, services.StartAndAwaitRunning(context.Background(), distributor))

b.Cleanup(func() {
require.NoError(b, services.StopAndAwaitTerminated(context.Background(), distributor))
})

// Prepare the series to remote write before starting the benchmark.
metrics, samples := testData.prepareSeries()

// Run the benchmark.
b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
_, err := distributor.Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API))
if err == nil || !strings.Contains(err.Error(), testData.expectedErr) {
b.Fatalf("expected %v error but got %v", testData.expectedErr, err)
}
}
})
}
}

func TestSlowQueries(t *testing.T) {
nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo")
nIngesters := 3
Expand Down Expand Up @@ -1661,6 +1902,20 @@ func (i *mockIngester) countCalls(name string) int {
return i.calls[name]
}

// noopIngester is a mocked ingester which does nothing.
type noopIngester struct {
client.IngesterClient
grpc_health_v1.HealthClient
}

func (i *noopIngester) Close() error {
return nil
}

func (i *noopIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
return nil, nil
}

type stream struct {
grpc.ClientStream
i int
Expand Down
Loading