Skip to content

Commit 684e656

Browse files
committed
address comment; handle histogram partial append errors
Signed-off-by: Ben Ye <benye@amazon.com>
1 parent c120357 commit 684e656

File tree

1 file changed

+61
-83
lines changed

1 file changed

+61
-83
lines changed

pkg/ingester/ingester.go

Lines changed: 61 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,6 +1083,65 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
10831083
firstPartialErr = errFn()
10841084
}
10851085
}
1086+
1087+
handleAppendFailure = func(err error, timestampMs int64, lbls []cortexpb.LabelAdapter, copiedLabels labels.Labels) (rollback bool) {
1088+
// Check if the error is a soft error we can proceed on. If so, we keep track
1089+
// of it, so that we can return it back to the distributor, which will return a
1090+
// 400 error to the client. The client (Prometheus) will not retry on 400, and
1091+
// we actually ingested all samples which haven't failed.
1092+
switch cause := errors.Cause(err); {
1093+
case errors.Is(cause, storage.ErrOutOfBounds):
1094+
sampleOutOfBoundsCount++
1095+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1096+
1097+
case errors.Is(cause, storage.ErrOutOfOrderSample):
1098+
sampleOutOfOrderCount++
1099+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1100+
1101+
case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
1102+
newValueForTimestampCount++
1103+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1104+
1105+
case errors.Is(cause, storage.ErrTooOldSample):
1106+
sampleTooOldCount++
1107+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1108+
1109+
case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
1110+
perUserSeriesLimitCount++
1111+
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
1112+
1113+
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
1114+
perMetricSeriesLimitCount++
1115+
updateFirstPartial(func() error {
1116+
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
1117+
})
1118+
1119+
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
1120+
perLabelSetSeriesLimitCount++
1121+
updateFirstPartial(func() error {
1122+
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
1123+
})
1124+
1125+
case errors.Is(cause, histogram.ErrHistogramSpanNegativeOffset):
1126+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1127+
1128+
case errors.Is(cause, histogram.ErrHistogramSpansBucketsMismatch):
1129+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1130+
1131+
case errors.Is(cause, histogram.ErrHistogramNegativeBucketCount):
1132+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1133+
1134+
case errors.Is(cause, histogram.ErrHistogramCountNotBigEnough):
1135+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1136+
1137+
case errors.Is(cause, histogram.ErrHistogramCountMismatch):
1138+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1139+
1140+
default:
1141+
rollback = true
1142+
}
1143+
return
1144+
}
10861145
)
10871146

10881147
// Walk the samples, appending them to the users database
@@ -1122,50 +1181,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
11221181

11231182
failedSamplesCount++
11241183

1125-
// Check if the error is a soft error we can proceed on. If so, we keep track
1126-
// of it, so that we can return it back to the distributor, which will return a
1127-
// 400 error to the client. The client (Prometheus) will not retry on 400, and
1128-
// we actually ingested all samples which haven't failed.
1129-
switch cause := errors.Cause(err); {
1130-
case errors.Is(cause, storage.ErrOutOfBounds):
1131-
sampleOutOfBoundsCount++
1132-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
1133-
continue
1134-
1135-
case errors.Is(cause, storage.ErrOutOfOrderSample):
1136-
sampleOutOfOrderCount++
1137-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
1138-
continue
1139-
1140-
case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
1141-
newValueForTimestampCount++
1142-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
1143-
continue
1144-
1145-
case errors.Is(cause, storage.ErrTooOldSample):
1146-
sampleTooOldCount++
1147-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
1148-
continue
1149-
1150-
case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
1151-
perUserSeriesLimitCount++
1152-
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
1153-
continue
1154-
1155-
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
1156-
perMetricSeriesLimitCount++
1157-
updateFirstPartial(func() error {
1158-
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
1159-
})
1160-
continue
1161-
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
1162-
perLabelSetSeriesLimitCount++
1163-
updateFirstPartial(func() error {
1164-
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
1165-
})
1184+
if rollback := handleAppendFailure(err, s.TimestampMs, ts.Labels, copiedLabels); !rollback {
11661185
continue
11671186
}
1168-
11691187
// The error looks an issue on our side, so we should rollback
11701188
if rollbackErr := app.Rollback(); rollbackErr != nil {
11711189
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr)
@@ -1204,49 +1222,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
12041222

12051223
failedSamplesCount++
12061224

1207-
// Check if the error is a soft error we can proceed on. If so, we keep track
1208-
// of it, so that we can return it back to the distributor, which will return a
1209-
// 400 error to the client. The client (Prometheus) will not retry on 400, and
1210-
// we actually ingested all samples which haven't failed.
1211-
switch cause := errors.Cause(err); {
1212-
case errors.Is(cause, storage.ErrOutOfBounds):
1213-
sampleOutOfBoundsCount++
1214-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(hp.TimestampMs), ts.Labels) })
1215-
continue
1216-
1217-
case errors.Is(cause, storage.ErrOutOfOrderSample):
1218-
sampleOutOfOrderCount++
1219-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(hp.TimestampMs), ts.Labels) })
1220-
continue
1221-
1222-
case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
1223-
newValueForTimestampCount++
1224-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(hp.TimestampMs), ts.Labels) })
1225-
continue
1226-
1227-
case errors.Is(cause, storage.ErrTooOldSample):
1228-
sampleTooOldCount++
1229-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(hp.TimestampMs), ts.Labels) })
1230-
continue
1231-
1232-
case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
1233-
perUserSeriesLimitCount++
1234-
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
1235-
continue
1236-
1237-
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
1238-
perMetricSeriesLimitCount++
1239-
updateFirstPartial(func() error {
1240-
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
1241-
})
1242-
continue
1243-
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
1244-
updateFirstPartial(func() error {
1245-
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
1246-
})
1225+
if rollback := handleAppendFailure(err, hp.TimestampMs, ts.Labels, copiedLabels); !rollback {
12471226
continue
12481227
}
1249-
12501228
// The error looks an issue on our side, so we should rollback
12511229
if rollbackErr := app.Rollback(); rollbackErr != nil {
12521230
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr)

0 commit comments

Comments
 (0)