Skip to content

Reduce memory usage from ingester Push() errors #1922

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions pkg/ingester/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package ingester

import (
"fmt"
"net/http"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"
)

type validationError struct {
err error // underlying error
errorType string
code int
noReport bool // if true, error will be counted but not reported to caller
labels labels.Labels
}

func makeLimitError(errorType string, err error) error {
return &validationError{
errorType: errorType,
err: err,
code: http.StatusTooManyRequests,
}
}

func makeNoReportError(errorType string) error {
return &validationError{
errorType: errorType,
noReport: true,
}
}

func makeMetricValidationError(errorType string, labels labels.Labels, err error) error {
return &validationError{
errorType: errorType,
err: err,
code: http.StatusBadRequest,
labels: labels,
}
}

func makeMetricLimitError(errorType string, labels labels.Labels, err error) error {
return &validationError{
errorType: errorType,
err: err,
code: http.StatusTooManyRequests,
labels: labels,
}
}

func (e *validationError) Error() string {
if e.err == nil {
return e.errorType
}
if e.labels == nil {
return e.err.Error()
}
return fmt.Sprintf("%s for series %s", e.err.Error(), e.labels.String())
}

// WrappedError returns a HTTP gRPC error than is correctly forwarded over gRPC.
func (e *validationError) WrappedError() error {
return httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{
Code: int32(e.code),
Body: []byte(e.Error()),
})
}
27 changes: 14 additions & 13 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
if err != nil {
return nil, fmt.Errorf("no user id")
}
var lastPartialErr error
var lastPartialErr *validationError

for _, ts := range req.Timeseries {
for _, s := range ts.Samples {
Expand All @@ -303,20 +303,20 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
}

i.metrics.ingestedSamplesFail.Inc()
if httpResp, ok := httpgrpc.HTTPResponseFromError(err); ok {
switch httpResp.Code {
case http.StatusBadRequest, http.StatusTooManyRequests:
lastPartialErr = err
continue
}
if ve, ok := err.(*validationError); ok {
lastPartialErr = ve
continue
}

return nil, err
}
}
client.ReuseSlice(req.Timeseries)

return &client.WriteResponse{}, lastPartialErr
if lastPartialErr != nil {
return &client.WriteResponse{}, lastPartialErr.WrappedError()
}
return &client.WriteResponse{}, nil
}

func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error {
Expand All @@ -338,6 +338,9 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
}
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels)
if err != nil {
if ve, ok := err.(*validationError); ok {
state.discardedSamples.WithLabelValues(ve.errorType).Inc()
}
state = nil // don't want to unlock the fp if there is an error
return err
}
Expand All @@ -357,13 +360,11 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
Value: value,
Timestamp: timestamp,
}); err != nil {
if mse, ok := err.(*memorySeriesError); ok {
state.discardedSamples.WithLabelValues(mse.errorType).Inc()
if mse.noReport {
if ve, ok := err.(*validationError); ok {
state.discardedSamples.WithLabelValues(ve.errorType).Inc()
if ve.noReport {
return nil
}
// Use a dumb string template to avoid the message being parsed as a template
err = httpgrpc.Errorf(http.StatusBadRequest, "%s", mse.message)
}
return err
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,16 @@ func TestIngesterAppendOutOfOrderAndDuplicate(t *testing.T) {
// Earlier sample than previous one.
err = ing.append(ctx, userID, m, 0, 0, client.API)
require.Contains(t, err.Error(), "sample timestamp out of order")
errResp, ok := httpgrpc.HTTPResponseFromError(err)
errResp, ok := err.(*validationError)
require.True(t, ok)
require.Equal(t, errResp.Code, int32(400))
require.Equal(t, errResp.code, 400)

// Same timestamp as previous sample, but different value.
err = ing.append(ctx, userID, m, 1, 1, client.API)
require.Contains(t, err.Error(), "sample with repeated timestamp but different value")
errResp, ok = httpgrpc.HTTPResponseFromError(err)
errResp, ok = err.(*validationError)
require.True(t, ok)
require.Equal(t, errResp.Code, int32(400))
require.Equal(t, errResp.code, 400)
}

// Test that blank labels are removed by the ingester
Expand Down Expand Up @@ -523,9 +523,19 @@ func benchmarkIngesterSeriesCreationLocking(b *testing.B, parallelism int) {
}

func BenchmarkIngesterPush(b *testing.B) {
limits := defaultLimitsTestConfig()
benchmarkIngesterPush(b, limits, false)
}

func BenchmarkIngesterPushErrors(b *testing.B) {
limits := defaultLimitsTestConfig()
limits.MaxLocalSeriesPerMetric = 1
benchmarkIngesterPush(b, limits, true)
}

func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpected bool) {
cfg := defaultIngesterTestConfig()
clientCfg := defaultClientTestConfig()
limits := defaultLimitsTestConfig()

const (
series = 100
Expand Down Expand Up @@ -567,7 +577,9 @@ func BenchmarkIngesterPush(b *testing.B) {
allSamples[i].TimestampMs = int64(j + 1)
}
_, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, client.API))
require.NoError(b, err)
if !errorsExpected {
require.NoError(b, err)
}
}
ing.Shutdown()
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien
// 400 error to the client. The client (Prometheus) will not retry on 400, and
// we actually ingested all samples which haven't failed.
if err == tsdb.ErrOutOfBounds || err == tsdb.ErrOutOfOrderSample || err == tsdb.ErrAmendSample {
lastPartialErr = httpgrpc.Errorf(http.StatusBadRequest, err.Error())
lastPartialErr = err
continue
}

Expand All @@ -135,7 +135,10 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien

client.ReuseSlice(req.Timeseries)

return &client.WriteResponse{}, lastPartialErr
if lastPartialErr != nil {
return &client.WriteResponse{}, httpgrpc.Errorf(http.StatusBadRequest, lastPartialErr.Error())
}
return &client.WriteResponse{}, nil
}

func (i *Ingester) v2Query(ctx old_ctx.Context, req *client.QueryRequest) (*client.QueryResponse, error) {
Expand Down
29 changes: 6 additions & 23 deletions pkg/ingester/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,6 @@ type memorySeries struct {
lastSampleValue model.SampleValue
}

type memorySeriesError struct {
message string
errorType string
noReport bool // if true, error will be counted but not reported to caller
}

func (error *memorySeriesError) Error() string {
if error.message == "" {
return error.errorType
}
return error.message
}

// newMemorySeries returns a pointer to a newly allocated memorySeries for the
// given metric.
func newMemorySeries(m labels.Labels) *memorySeries {
Expand All @@ -71,24 +58,20 @@ func (s *memorySeries) add(v model.SamplePair) error {
// If we don't know what the last sample value is, silently discard.
// This will mask some errors but better than complaining when we don't really know.
if !s.lastSampleValueSet {
return &memorySeriesError{errorType: "duplicate-timestamp", noReport: true}
return makeNoReportError("duplicate-timestamp")
}
// If both timestamp and sample value are the same as for the last append,
// ignore as they are a common occurrence when using client-side timestamps
// (e.g. Pushgateway or federation).
if v.Value.Equal(s.lastSampleValue) {
return &memorySeriesError{errorType: "duplicate-sample", noReport: true}
}
return &memorySeriesError{
message: fmt.Sprintf("sample with repeated timestamp but different value for series %v; last value: %v, incoming value: %v", s.metric, s.lastSampleValue, v.Value),
errorType: "new-value-for-timestamp",
return makeNoReportError("duplicate-sample")
}
return makeMetricValidationError("new-value-for-timestamp", s.metric,
fmt.Errorf("sample with repeated timestamp but different value; last value: %v, incoming value: %v", s.lastSampleValue, v.Value))
}
if v.Timestamp < s.lastTime {
return &memorySeriesError{
message: fmt.Sprintf("sample timestamp out of order for series %v; last timestamp: %v, incoming timestamp: %v", s.metric, s.lastTime, v.Timestamp),
errorType: "sample-out-of-order",
}
return makeMetricValidationError("sample-out-of-order", s.metric,
fmt.Errorf("sample timestamp out of order; last timestamp: %v, incoming timestamp: %v", s.lastTime, v.Timestamp))
}

if len(s.chunkDescs) == 0 || s.headChunkClosed {
Expand Down
6 changes: 2 additions & 4 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri
err := u.limiter.AssertMaxSeriesPerUser(u.userID, u.fpToSeries.length())
if err != nil {
u.fpLocker.Unlock(fp)
u.discardedSamples.WithLabelValues(perUserSeriesLimit).Inc()
return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
return fp, nil, makeLimitError(perUserSeriesLimit, err)
}

metricName, err := extract.MetricNameFromLabelAdapters(metric)
Expand All @@ -209,8 +208,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri
err = u.canAddSeriesFor(string(metricName))
if err != nil {
u.fpLocker.Unlock(fp)
u.discardedSamples.WithLabelValues(perMetricSeriesLimit).Inc()
return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s for: %s", err.Error(), metric)
return fp, nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err)
}

u.memSeriesCreatedTotal.Inc()
Expand Down