Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

* [FEATURE] Distributor: Add experimental `-distributor.otel-native-delta-ingestion` option to allow primitive delta metrics ingestion via the OTLP endpoint. #11631
* [FEATURE] MQE: Add support for experimental `sort_by_label` and `sort_by_label_desc` PromQL functions. #11930
* [FEATURE] Ingester/Block-builder: Handle the created timestamp field for remote-write requests. #11977
* [ENHANCEMENT] Ingester: Display user grace interval in the tenant list obtained through the `/ingester/tenants` endpoint. #11961
* [ENHANCEMENT] `kafkatool`: add `consumer-group delete-offset` command as a way to delete the committed offset for a consumer group. #11988
* [ENHANCEMENT] Block-builder-scheduler: Detect gaps in scheduled and completed jobs. #11867
* [ENHANCEMENT] Distributor: Experimental support for Prometheus Remote-Write 2.0 protocol has been updated. Created timestamps are now supported. This feature includes some limitations. If samples in a write request aren't ordered by time, the created timestamp might be dropped. Additionally, per-series metadata is automatically merged on the metric family level. Ingestion might fail if the client sends ProtoBuf fields out-of-order. The label `version` is added to the metric `cortex_distributor_requests_in_total` with a value of either `1.0` or `2.0`, depending on the detected remote-write protocol. #11977
* [BUGFIX] Distributor: Validate the RW2 symbols field and reject invalid requests that don't have an empty string as the first symbol. #11953

### Mixin
Expand Down
51 changes: 50 additions & 1 deletion integration/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type distributorTestCase struct {
func testDistributorWithCachingUnmarshalData(t *testing.T, cachingUnmarshalDataEnabled bool) {
queryEnd := time.Now().Round(time.Second)
queryStart := queryEnd.Add(-1 * time.Hour)
queryStep := 10 * time.Minute
queryStep := 5 * time.Minute

overridesWithExemplars := func(maxExemplars int) string {
return fmt.Sprintf("overrides:\n \"%s\":\n max_global_exemplars_per_user: %d\n", userID, maxExemplars)
Expand Down Expand Up @@ -142,6 +142,55 @@ func testDistributorWithCachingUnmarshalData(t *testing.T, cachingUnmarshalDataE
},
},

"simple counter with created timestamp": {
rw1request: nil, // Not supported in RW1
rw2request: []promRW2.Request{
{
Timeseries: []promRW2.TimeSeries{
{
LabelsRefs: []uint32{1, 2},
Samples: []promRW2.Sample{{Timestamp: queryStart.Add(1 * time.Second).UnixMilli(), Value: 100}},
Metadata: promRW2.Metadata{
Type: promRW2.Metadata_METRIC_TYPE_COUNTER,
HelpRef: 3,
UnitRef: 4,
},
CreatedTimestamp: queryStart.UnixMilli(),
},
},
Symbols: []string{"", "__name__", "foobarC_CT_total", "some helpC_CT", "someunitC_CT"},
},
},
queries: map[string]model.Matrix{
"foobarC_CT_total": {{
Metric: model.Metric{"__name__": "foobarC_CT_total"},
Values: []model.SamplePair{
{Timestamp: model.Time(queryStart.UnixMilli()), Value: model.SampleValue(0)},
{Timestamp: model.Time(queryStart.Add(5 * time.Minute).UnixMilli()), Value: model.SampleValue(100)},
},
}},
},
metadataQueries: map[string]metadataResponse{
"foobarC_CT_total": {
Status: "success",
Data: map[string][]metadataResponseItem{
"foobarC_CT_total": {{
Type: "counter",
Help: "some helpC_CT",
Unit: "someunitC_CT",
}},
},
},
},
expectedStats: []promRemote.WriteResponseStats{
{
Samples: 1,
Histograms: 0,
Exemplars: 0,
},
},
},

"simple gauge": {
rw1request: []prompb.WriteRequest{
{
Expand Down
49 changes: 49 additions & 0 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,26 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record) (err error)
// and NOT the stable hashing because that's what TSDB expects. We don't need stable hashing in block builder.
ref, copiedLabels := app.GetRef(nonCopiedLabels, hash)

ingestCreatedTimestamp := ts.CreatedTimestamp > 0

for _, s := range ts.Samples {
if ingestCreatedTimestamp && ts.CreatedTimestamp < s.TimestampMs &&
(!nativeHistogramsIngestionEnabled || len(ts.Histograms) == 0 || ts.Histograms[0].Timestamp >= s.TimestampMs) {
if ref != 0 {
// If the cached reference exists, we try to use it.
_, err = app.AppendCTZeroSample(ref, copiedLabels, s.TimestampMs, ts.CreatedTimestamp)
} else {
// Copy the label set because TSDB may retain it.
copiedLabels = mimirpb.CopyLabels(nonCopiedLabels)
ref, err = app.AppendCTZeroSample(0, copiedLabels, s.TimestampMs, ts.CreatedTimestamp)
}
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) && !errors.Is(err, storage.ErrOutOfOrderSample) {
level.Warn(b.logger).Log("msg", "failed to store zero float sample for created timestamp", "tenant", userID, "err", err)
discardedSamples++
}
ingestCreatedTimestamp = false // Only try to append created timestamp once per series.
}

if ref != 0 {
// If the cached reference exists, we try to use it.
if _, err = app.Append(ref, copiedLabels, s.TimestampMs, s.Value); err == nil {
Expand Down Expand Up @@ -161,6 +180,31 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record) (err error)
}

for _, h := range ts.Histograms {
if ingestCreatedTimestamp && ts.CreatedTimestamp < h.Timestamp {
var (
ih *histogram.Histogram
fh *histogram.FloatHistogram
)
// AppendHistogramCTZeroSample doesn't care about the content of the passed histograms,
// just uses it to decide the type, so don't convert the input, use dummy histograms.
if h.IsFloatHistogram() {
fh = zeroFloatHistogram
} else {
ih = zeroHistogram
}
if ref != 0 {
_, err = app.AppendHistogramCTZeroSample(ref, copiedLabels, h.Timestamp, ts.CreatedTimestamp, ih, fh)
} else {
// Copy the label set because both TSDB and the active series tracker may retain it.
copiedLabels = mimirpb.CopyLabels(nonCopiedLabels)
ref, err = app.AppendHistogramCTZeroSample(0, copiedLabels, h.Timestamp, ts.CreatedTimestamp, ih, fh)
}
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) && !errors.Is(err, storage.ErrOutOfOrderSample) {
level.Warn(b.logger).Log("msg", "failed to store zero histogram sample for created timestamp", "tenant", userID, "err", err)
discardedSamples++
}
ingestCreatedTimestamp = false // Only try to append created timestamp once per series.
}
var (
ih *histogram.Histogram
fh *histogram.FloatHistogram
Expand Down Expand Up @@ -206,6 +250,11 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record) (err error)
return app.Commit()
}

var (
zeroHistogram = &histogram.Histogram{}
zeroFloatHistogram = &histogram.FloatHistogram{}
)

func (b *TSDBBuilder) getOrCreateTSDB(tenant tsdbTenant) (*userTSDB, error) {
b.tsdbsMu.RLock()
db := b.tsdbs[tenant]
Expand Down
Loading