Skip to content

Send new label 'status' for ingester failures #4442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317
* [FEATURE] Query Frontend: Add `cortex_query_fetched_series_total` and `cortex_query_fetched_chunks_bytes_total` per-user counters to expose the number of series and bytes fetched as part of queries. These metrics can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #4343
* [FEATURE] AlertManager: Add support for SNS Receiver. #4382
* [FEATURE] Distributor: Add label `status` to metric `cortex_distributor_ingester_append_failures_total` #4442
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
* [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342
Expand Down
16 changes: 13 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Namespace: "cortex",
Name: "distributor_ingester_append_failures_total",
Help: "The total number of failed batch appends sent to ingesters.",
}, []string{"ingester", "type"}),
}, []string{"ingester", "type", "status"}),
ingesterQueries: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_ingester_queries_total",
Expand Down Expand Up @@ -819,19 +819,29 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
if len(metadata) > 0 {
d.ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
if err != nil {
d.ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata, getErrorStatus(err)).Inc()
}
}
if len(timeseries) > 0 {
d.ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc()
if err != nil {
d.ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples).Inc()
d.ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples, getErrorStatus(err)).Inc()
}
}

return err
}

func getErrorStatus(err error) string {
status := "5xx"
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
if ok && httpResp.Code/100 == 4 {
status = "4xx"
}

return status
}

// ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
Expand Down
38 changes: 34 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (
)

var (
errFail = fmt.Errorf("Fail")
errFail = httpgrpc.Errorf(http.StatusInternalServerError, "Fail")
emptyResponse = &cortexpb.WriteResponse{}
)

Expand Down Expand Up @@ -124,6 +124,7 @@ func TestDistributor_Push(t *testing.T) {
expectedResponse *cortexpb.WriteResponse
expectedError error
expectedMetrics string
ingesterError error
}{
"A push of no samples shouldn't block or return error, even if ingesters are sad": {
numIngesters: 3,
Expand Down Expand Up @@ -203,7 +204,7 @@ func TestDistributor_Push(t *testing.T) {
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="2",type="samples"} 1
cortex_distributor_ingester_append_failures_total{ingester="2",status="5xx",type="samples"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="samples"} 1
Expand All @@ -218,10 +219,30 @@ func TestDistributor_Push(t *testing.T) {
metadata: 1,
metricNames: []string{distributorAppend, distributorAppendFailure},
expectedResponse: emptyResponse,
ingesterError: httpgrpc.Errorf(http.StatusInternalServerError, "Fail"),
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="2",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="2",status="5xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="2",type="metadata"} 1
`,
},
"A push to overloaded ingesters should report the correct metrics": {
numIngesters: 3,
happyIngesters: 2,
samples: samplesIn{num: 0, startTimestampMs: 123456789000},
metadata: 1,
metricNames: []string{distributorAppend, distributorAppendFailure},
expectedResponse: emptyResponse,
ingesterError: httpgrpc.Errorf(http.StatusTooManyRequests, "Fail"),
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="2",status="4xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
Expand All @@ -243,6 +264,7 @@ func TestDistributor_Push(t *testing.T) {
numDistributors: 1,
shardByAllLabels: shardByAllLabels,
limits: limits,
errFail: tc.ingesterError,
})
defer stopAll(ds, r)

Expand Down Expand Up @@ -1905,6 +1927,7 @@ type prepConfig struct {
maxInflightRequests int
maxIngestionRate float64
replicationFactor int
errFail error
}

func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *ring.Ring, []*prometheus.Registry) {
Expand All @@ -1916,8 +1939,14 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin
})
}
for i := cfg.happyIngesters; i < cfg.numIngesters; i++ {
miError := errFail
if cfg.errFail != nil {
miError = cfg.errFail
}

ingesters = append(ingesters, mockIngester{
queryDelay: cfg.queryDelay,
failResp: miError,
})
}

Expand Down Expand Up @@ -2149,6 +2178,7 @@ type mockIngester struct {
client.IngesterClient
grpc_health_v1.HealthClient
happy bool
failResp error
stats client.UsersStatsResponse
timeseries map[uint32]*cortexpb.PreallocTimeseries
metadata map[uint32]map[cortexpb.MetricMetadata]struct{}
Expand Down Expand Up @@ -2187,7 +2217,7 @@ func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opt
i.trackCall("Push")

if !i.happy {
return nil, errFail
return nil, i.failResp
}

if i.timeseries == nil {
Expand Down