Skip to content

fix query frontend per tenant metrics leak when cleaning up user labels #6698

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [BUGFIX] Ingester: Add check to avoid query 5xx when closing tsdb. #6616
* [BUGFIX] Querier: Fix panic when marshaling QueryResultRequest. #6601
* [BUGFIX] Ingester: Avoid resharding for query when restart readonly ingesters. #6642
* [BUGFIX] Query Frontend: Fix query frontend per `user` metrics clean up. #6698

## 1.19.0 2025-02-27

Expand Down
48 changes: 36 additions & 12 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,49 @@ func NewHandler(cfg HandlerConfig, tenantFederationCfg tenantfederation.Config,
[]string{"reason", "source", "user"},
)

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.queryFetchedSeries.DeleteLabelValues(user)
h.queryFetchedSamples.DeleteLabelValues(user)
h.queryScannedSamples.DeleteLabelValues(user)
h.queryPeakSamples.DeleteLabelValues(user)
h.queryChunkBytes.DeleteLabelValues(user)
h.queryDataBytes.DeleteLabelValues(user)
if err := util.DeleteMatchingLabels(h.rejectedQueries, map[string]string{"user": user}); err != nil {
level.Warn(log).Log("msg", "failed to remove cortex_rejected_queries_total metric for user", "user", user, "err", err)
}
})
h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(h.cleanupMetricsForInactiveUser)
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = h.activeUsers.StartAsync(context.Background())
}

return h
}

func (h *Handler) cleanupMetricsForInactiveUser(user string) {
if !h.cfg.QueryStatsEnabled {
return
}

// Create a map with the user label to match
userLabel := map[string]string{"user": user}

// Clean up all metrics for the user
if err := util.DeleteMatchingLabels(h.querySeconds, userLabel); err != nil {
level.Warn(h.log).Log("msg", "failed to remove cortex_query_seconds_total metric for user", "user", user, "err", err)
}
if err := util.DeleteMatchingLabels(h.queryFetchedSeries, userLabel); err != nil {
level.Warn(h.log).Log("msg", "failed to remove cortex_query_fetched_series_total metric for user", "user", user, "err", err)
}
if err := util.DeleteMatchingLabels(h.queryFetchedSamples, userLabel); err != nil {
level.Warn(h.log).Log("msg", "failed to remove cortex_query_samples_total metric for user", "user", user, "err", err)
}
if err := util.DeleteMatchingLabels(h.queryScannedSamples, userLabel); err != nil {
level.Warn(h.log).Log("msg", "failed to remove cortex_query_samples_scanned_total metric for user", "user", user, "err", err)
}
if err := util.DeleteMatchingLabels(h.queryPeakSamples, userLabel); err != nil {
level.Warn(h.log).Log("msg", "failed to remove cortex_query_peak_samples metric for user", "user", user, "err", err)
}
if err := util.DeleteMatchingLabels(h.queryChunkBytes, userLabel); err != nil {
level.Warn(h.log).Log("msg", "failed to remove cortex_query_fetched_chunks_bytes_total metric for user", "user", user, "err", err)
}
if err := util.DeleteMatchingLabels(h.queryDataBytes, userLabel); err != nil {
level.Warn(h.log).Log("msg", "failed to remove cortex_query_fetched_data_bytes_total metric for user", "user", user, "err", err)
}
if err := util.DeleteMatchingLabels(h.rejectedQueries, userLabel); err != nil {
level.Warn(h.log).Log("msg", "failed to remove cortex_rejected_queries_total metric for user", "user", user, "err", err)
}
}

func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
stats *querier_stats.QueryStats
Expand Down
106 changes: 106 additions & 0 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,3 +637,109 @@ func Test_TenantFederation_MaxTenant(t *testing.T) {
})
}
}

func TestHandlerMetricsCleanup(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, tenantfederation.Config{}, http.DefaultTransport, log.NewNopLogger(), reg)

user1 := "user1"
user2 := "user2"
source := "api"

// Simulate activity for user1
handler.querySeconds.WithLabelValues(source, user1).Add(1.0)
handler.queryFetchedSeries.WithLabelValues(source, user1).Add(100)
handler.queryFetchedSamples.WithLabelValues(source, user1).Add(1000)
handler.queryScannedSamples.WithLabelValues(source, user1).Add(2000)
handler.queryPeakSamples.WithLabelValues(source, user1).Observe(500)
handler.queryChunkBytes.WithLabelValues(source, user1).Add(1024)
handler.queryDataBytes.WithLabelValues(source, user1).Add(2048)
handler.rejectedQueries.WithLabelValues(reasonTooManySamples, source, user1).Add(5)

// Simulate activity for user2
handler.querySeconds.WithLabelValues(source, user2).Add(2.0)
handler.queryFetchedSeries.WithLabelValues(source, user2).Add(200)
handler.queryFetchedSamples.WithLabelValues(source, user2).Add(2000)
handler.queryScannedSamples.WithLabelValues(source, user2).Add(4000)
handler.queryPeakSamples.WithLabelValues(source, user2).Observe(1000)
handler.queryChunkBytes.WithLabelValues(source, user2).Add(2048)
handler.queryDataBytes.WithLabelValues(source, user2).Add(4096)
handler.rejectedQueries.WithLabelValues(reasonTooManySamples, source, user2).Add(10)

// Verify initial state - both users should have metrics
require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_query_seconds_total Total amount of wall clock time spend processing queries.
# TYPE cortex_query_seconds_total counter
cortex_query_seconds_total{source="api",user="user1"} 1
cortex_query_seconds_total{source="api",user="user2"} 2
# HELP cortex_query_fetched_series_total Number of series fetched to execute a query.
# TYPE cortex_query_fetched_series_total counter
cortex_query_fetched_series_total{source="api",user="user1"} 100
cortex_query_fetched_series_total{source="api",user="user2"} 200
# HELP cortex_query_samples_total Number of samples fetched to execute a query.
# TYPE cortex_query_samples_total counter
cortex_query_samples_total{source="api",user="user1"} 1000
cortex_query_samples_total{source="api",user="user2"} 2000
# HELP cortex_query_samples_scanned_total Number of samples scanned to execute a query.
# TYPE cortex_query_samples_scanned_total counter
cortex_query_samples_scanned_total{source="api",user="user1"} 2000
cortex_query_samples_scanned_total{source="api",user="user2"} 4000
# HELP cortex_query_peak_samples Highest count of samples considered to execute a query.
# TYPE cortex_query_peak_samples histogram
cortex_query_peak_samples_bucket{source="api",user="user1",le="+Inf"} 1
cortex_query_peak_samples_sum{source="api",user="user1"} 500
cortex_query_peak_samples_count{source="api",user="user1"} 1
cortex_query_peak_samples_bucket{source="api",user="user2",le="+Inf"} 1
cortex_query_peak_samples_sum{source="api",user="user2"} 1000
cortex_query_peak_samples_count{source="api",user="user2"} 1
# HELP cortex_query_fetched_chunks_bytes_total Size of all chunks fetched to execute a query in bytes.
# TYPE cortex_query_fetched_chunks_bytes_total counter
cortex_query_fetched_chunks_bytes_total{source="api",user="user1"} 1024
cortex_query_fetched_chunks_bytes_total{source="api",user="user2"} 2048
# HELP cortex_query_fetched_data_bytes_total Size of all data fetched to execute a query in bytes.
# TYPE cortex_query_fetched_data_bytes_total counter
cortex_query_fetched_data_bytes_total{source="api",user="user1"} 2048
cortex_query_fetched_data_bytes_total{source="api",user="user2"} 4096
# HELP cortex_rejected_queries_total The total number of queries that were rejected.
# TYPE cortex_rejected_queries_total counter
cortex_rejected_queries_total{reason="too_many_samples",source="api",user="user1"} 5
cortex_rejected_queries_total{reason="too_many_samples",source="api",user="user2"} 10
`), "cortex_query_seconds_total", "cortex_query_fetched_series_total", "cortex_query_samples_total",
"cortex_query_samples_scanned_total", "cortex_query_peak_samples", "cortex_query_fetched_chunks_bytes_total",
"cortex_query_fetched_data_bytes_total", "cortex_rejected_queries_total"))

// Clean up metrics for user1
handler.cleanupMetricsForInactiveUser(user1)

// Verify final state - only user2 should have metrics
require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_query_seconds_total Total amount of wall clock time spend processing queries.
# TYPE cortex_query_seconds_total counter
cortex_query_seconds_total{source="api",user="user2"} 2
# HELP cortex_query_fetched_series_total Number of series fetched to execute a query.
# TYPE cortex_query_fetched_series_total counter
cortex_query_fetched_series_total{source="api",user="user2"} 200
# HELP cortex_query_samples_total Number of samples fetched to execute a query.
# TYPE cortex_query_samples_total counter
cortex_query_samples_total{source="api",user="user2"} 2000
# HELP cortex_query_samples_scanned_total Number of samples scanned to execute a query.
# TYPE cortex_query_samples_scanned_total counter
cortex_query_samples_scanned_total{source="api",user="user2"} 4000
# HELP cortex_query_peak_samples Highest count of samples considered to execute a query.
# TYPE cortex_query_peak_samples histogram
cortex_query_peak_samples_bucket{source="api",user="user2",le="+Inf"} 1
cortex_query_peak_samples_sum{source="api",user="user2"} 1000
cortex_query_peak_samples_count{source="api",user="user2"} 1
# HELP cortex_query_fetched_chunks_bytes_total Size of all chunks fetched to execute a query in bytes.
# TYPE cortex_query_fetched_chunks_bytes_total counter
cortex_query_fetched_chunks_bytes_total{source="api",user="user2"} 2048
# HELP cortex_query_fetched_data_bytes_total Size of all data fetched to execute a query in bytes.
# TYPE cortex_query_fetched_data_bytes_total counter
cortex_query_fetched_data_bytes_total{source="api",user="user2"} 4096
# HELP cortex_rejected_queries_total The total number of queries that were rejected.
# TYPE cortex_rejected_queries_total counter
cortex_rejected_queries_total{reason="too_many_samples",source="api",user="user2"} 10
`), "cortex_query_seconds_total", "cortex_query_fetched_series_total", "cortex_query_samples_total",
"cortex_query_samples_scanned_total", "cortex_query_peak_samples", "cortex_query_fetched_chunks_bytes_total",
"cortex_query_fetched_data_bytes_total", "cortex_rejected_queries_total"))
}
Loading