Skip to content

Commit c37a3f7

Browse files
committed
fix distributor sends native histograms
Signed-off-by: Ben Ye <benye@amazon.com>
1 parent 72ba1d5 commit c37a3f7

File tree

2 files changed

+23
-8
lines changed

2 files changed

+23
-8
lines changed

pkg/distributor/distributor.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
287287
incomingMetadata: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
288288
Namespace: "cortex",
289289
Name: "distributor_metadata_in_total",
290-
Help: "The total number of metadata the have come in to the distributor, including rejected.",
290+
Help: "The total number of metadata that have come in to the distributor, including rejected.",
291291
}, []string{"user"}),
292292
nonHASamples: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
293293
Namespace: "cortex",
@@ -538,7 +538,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
538538
for _, e := range ts.Exemplars {
539539
if err := validation.ValidateExemplar(userID, ts.Labels, e); err != nil {
540540
// An exemplar validation error prevents ingesting samples
541-
// in the same series object. However because the current Prometheus
541+
// in the same series object. However, because the current Prometheus
542542
// remote write implementation only populates one or the other,
543543
// there never will be any.
544544
return emptyPreallocSeries, err
@@ -547,11 +547,23 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
547547
}
548548
}
549549

550+
var histograms []cortexpb.Histogram
551+
if len(ts.Histograms) > 0 {
552+
// Only alloc when data present
553+
histograms = make([]cortexpb.Histogram, 0, len(ts.Histograms))
554+
// TODO(yeya24): we need to have validations for native histograms
555+
// at some point. Skip validations for now.
556+
for _, h := range ts.Histograms {
557+
histograms = append(histograms, h)
558+
}
559+
}
560+
550561
return cortexpb.PreallocTimeseries{
551562
TimeSeries: &cortexpb.TimeSeries{
552-
Labels: ts.Labels,
553-
Samples: samples,
554-
Exemplars: exemplars,
563+
Labels: ts.Labels,
564+
Samples: samples,
565+
Exemplars: exemplars,
566+
Histograms: histograms,
555567
},
556568
},
557569
nil
@@ -589,10 +601,11 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
589601
numSamples := 0
590602
numExemplars := 0
591603
for _, ts := range req.Timeseries {
592-
numSamples += len(ts.Samples)
604+
// Should we differentiate normal sample and histogram sample?
605+
numSamples += len(ts.Samples) + len(ts.Histograms)
593606
numExemplars += len(ts.Exemplars)
594607
}
595-
// Count the total samples in, prior to validation or deduplication, for comparison with other metrics.
608+
// Count the total samples, exemplars in, prior to validation or deduplication, for comparison with other metrics.
596609
d.incomingSamples.WithLabelValues(userID).Add(float64(numSamples))
597610
d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars))
598611
// Count the total number of metadata in.
@@ -772,6 +785,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
772785
if len(ts.Samples) > 0 {
773786
latestSampleTimestampMs = util_math.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
774787
}
788+
// TODO: use timestamp of the latest native histogram in the series as well.
775789

776790
if mrc := limits.MetricRelabelConfigs; len(mrc) > 0 {
777791
l, _ := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
@@ -836,6 +850,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
836850

837851
seriesKeys = append(seriesKeys, key)
838852
validatedTimeseries = append(validatedTimeseries, validatedSeries)
853+
// TODO(yeya24): add histogram samples as well when supported.
839854
validatedSamples += len(ts.Samples)
840855
validatedExemplars += len(ts.Exemplars)
841856
}

pkg/distributor/distributor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
341341
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
342342
cortex_distributor_latest_seen_sample_timestamp_seconds{user="userA"} 1111
343343
344-
# HELP cortex_distributor_metadata_in_total The total number of metadata the have come in to the distributor, including rejected.
344+
# HELP cortex_distributor_metadata_in_total The total number of metadata that have come in to the distributor, including rejected.
345345
# TYPE cortex_distributor_metadata_in_total counter
346346
cortex_distributor_metadata_in_total{user="userA"} 5
347347

0 commit comments

Comments
 (0)