Skip to content

Create ValidateMetrics #5988

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ type Distributor struct {
ingesterQueryFailures *prometheus.CounterVec
replicationFactor prometheus.Gauge
latestSeenSampleTimestampPerUser *prometheus.GaugeVec

validateMetrics *validation.ValidateMetrics
}

// Config contains the configuration required to
Expand Down Expand Up @@ -345,6 +347,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "cortex_distributor_latest_seen_sample_timestamp_seconds",
Help: "Unix timestamp of latest received sample per user.",
}, []string{"user"}),

validateMetrics: validation.NewValidateMetrics(reg),
}

promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Expand Down Expand Up @@ -437,7 +441,7 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err)
}

validation.DeletePerUserValidationMetrics(userID, d.log)
validation.DeletePerUserValidationMetrics(d.validateMetrics, userID, d.log)
}

// Called after distributor is asked to stop via StopAsync.
Expand Down Expand Up @@ -534,7 +538,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool, limits *validation.Limits) (cortexpb.PreallocTimeseries, validation.ValidationError) {
d.labelsHistogram.Observe(float64(len(ts.Labels)))

if err := validation.ValidateLabels(limits, userID, ts.Labels, skipLabelNameValidation); err != nil {
if err := validation.ValidateLabels(d.validateMetrics, limits, userID, ts.Labels, skipLabelNameValidation); err != nil {
return emptyPreallocSeries, err
}

Expand All @@ -543,7 +547,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
// Only alloc when data present
samples = make([]cortexpb.Sample, 0, len(ts.Samples))
for _, s := range ts.Samples {
if err := validation.ValidateSample(limits, userID, ts.Labels, s); err != nil {
if err := validation.ValidateSample(d.validateMetrics, limits, userID, ts.Labels, s); err != nil {
return emptyPreallocSeries, err
}
samples = append(samples, s)
Expand All @@ -555,7 +559,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
// Only alloc when data present
exemplars = make([]cortexpb.Exemplar, 0, len(ts.Exemplars))
for _, e := range ts.Exemplars {
if err := validation.ValidateExemplar(userID, ts.Labels, e); err != nil {
if err := validation.ValidateExemplar(d.validateMetrics, userID, ts.Labels, e); err != nil {
// An exemplar validation error prevents ingesting samples
// in the same series object. However, because the current Prometheus
// remote write implementation only populates one or the other,
Expand Down Expand Up @@ -643,7 +647,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}

if errors.Is(err, ha.TooManyReplicaGroupsError{}) {
validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

Expand Down Expand Up @@ -678,9 +682,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
// Ensure the request slice is reused if the request is rate limited.
cortexpb.ReuseSlice(req.Timeseries)

validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
Expand Down Expand Up @@ -790,7 +794,7 @@ func (d *Distributor) prepareMetadataKeys(req *cortexpb.WriteRequest, limits *va
metadataKeys := make([]uint32, 0, len(req.Metadata))

for _, m := range req.Metadata {
err := validation.ValidateMetadata(limits, userID, m)
err := validation.ValidateMetadata(d.validateMetrics, limits, userID, m)

if err != nil {
if firstPartialErr == nil {
Expand Down Expand Up @@ -841,7 +845,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
l, _ := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
if len(l) == 0 {
// all labels are gone, samples will be discarded
validation.DiscardedSamples.WithLabelValues(
d.validateMetrics.DiscardedSamples.WithLabelValues(
validation.DroppedByRelabelConfiguration,
userID,
).Add(float64(len(ts.Samples)))
Expand All @@ -862,7 +866,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
}

if len(ts.Labels) == 0 {
validation.DiscardedExemplars.WithLabelValues(
d.validateMetrics.DiscardedExemplars.WithLabelValues(
validation.DroppedByUserConfigurationOverride,
userID,
).Add(float64(len(ts.Samples)))
Expand Down
4 changes: 0 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3589,9 +3589,6 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing
limits: &limits,
})

regs[0].MustRegister(validation.DiscardedSamples)
validation.DiscardedSamples.Reset()

// Push the series to the distributor
req := mockWriteRequest(inputSeries, 1, 1)
ctx := user.InjectOrgID(context.Background(), "userDistributorPushRelabelDropWillExportMetricOfDroppedSamples")
Expand All @@ -3615,7 +3612,6 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{user="userDistributorPushRelabelDropWillExportMetricOfDroppedSamples"} 1
`

require.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(expectedMetrics), metrics...))
}

Expand Down
27 changes: 15 additions & 12 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,10 @@ type Ingester struct {

cfg Config

metrics *ingesterMetrics
logger log.Logger
metrics *ingesterMetrics
validateMetrics *validation.ValidateMetrics

logger log.Logger

lifecycler *ring.Lifecycler
limits *validation.Overrides
Expand Down Expand Up @@ -660,6 +662,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
i.ingestionRate,
&i.inflightPushRequests,
&i.maxInflightQueryRequests)
i.validateMetrics = validation.NewValidateMetrics(registerer)

// Replace specific metrics which we can't directly track but we need to read
// them from the underlying system (ie. TSDB).
Expand Down Expand Up @@ -1236,29 +1239,29 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
i.metrics.ingestedExemplarsFail.Add(float64(failedExemplarsCount))

if sampleOutOfBoundsCount > 0 {
validation.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID).Add(float64(sampleOutOfBoundsCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID).Add(float64(sampleOutOfBoundsCount))
}
if sampleOutOfOrderCount > 0 {
validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount))
}
if sampleTooOldCount > 0 {
validation.DiscardedSamples.WithLabelValues(sampleTooOld, userID).Add(float64(sampleTooOldCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(sampleTooOld, userID).Add(float64(sampleTooOldCount))
}
if newValueForTimestampCount > 0 {
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
}
if perUserSeriesLimitCount > 0 {
validation.DiscardedSamples.WithLabelValues(perUserSeriesLimit, userID).Add(float64(perUserSeriesLimitCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(perUserSeriesLimit, userID).Add(float64(perUserSeriesLimitCount))
}
if perMetricSeriesLimitCount > 0 {
validation.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
}
if perLabelSetSeriesLimitCount > 0 {
validation.DiscardedSamples.WithLabelValues(perLabelsetSeriesLimit, userID).Add(float64(perLabelSetSeriesLimitCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(perLabelsetSeriesLimit, userID).Add(float64(perLabelSetSeriesLimitCount))
}

if nativeHistogramCount > 0 {
validation.DiscardedSamples.WithLabelValues(nativeHistogramSample, userID).Add(float64(nativeHistogramCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramSample, userID).Add(float64(nativeHistogramCount))
}

// Distributor counts both samples and metadata, so for consistency ingester does the same.
Expand Down Expand Up @@ -2530,7 +2533,7 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes
i.deleteUserMetadata(userID)
i.metrics.deletePerUserMetrics(userID)

validation.DeletePerUserValidationMetrics(userID, i.logger)
validation.DeletePerUserValidationMetrics(i.validateMetrics, userID, i.logger)

// And delete local data.
if err := os.RemoveAll(dir); err != nil {
Expand Down Expand Up @@ -2604,7 +2607,7 @@ func (i *Ingester) getOrCreateUserMetadata(userID string) *userMetricsMetadata {
// Ensure it was not created between switching locks.
userMetadata, ok := i.usersMetadata[userID]
if !ok {
userMetadata = newMetadataMap(i.limiter, i.metrics, userID)
userMetadata = newMetadataMap(i.limiter, i.metrics, i.validateMetrics, userID)
i.usersMetadata[userID] = userMetadata
}
return userMetadata
Expand Down
Loading
Loading