Skip to content

Cleanup of query-frontend and query-scheduler metrics for inactive users. #3826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* `cortex_ha_tracker_replicas_cleanup_deleted_total`
* `cortex_ha_tracker_replicas_cleanup_delete_failed_total`
* [ENHANCEMENT] Tenant deletion endpoints now support deletion of ruler groups. This only works when using rule store that supports deletion. #3750
* [ENHANCEMENT] Query-frontend, query-scheduler: cleanup metrics for inactive tenants. #3826
* [ENHANCEMENT] Distributor: Prevent failed ingestion from affecting rate limiting. #3825
* [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745
Expand Down
5 changes: 1 addition & 4 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,7 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) {
t.API.RegisterQueryFrontend1(frontendV1)
t.Frontend = frontendV1

return services.NewIdleService(nil, func(_ error) error {
frontendV1.Close()
return nil
}), nil
return frontendV1, nil
} else if frontendV2 != nil {
t.API.RegisterQueryFrontend2(frontendV2)

Expand Down
25 changes: 7 additions & 18 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ var (
// Validation errors.
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")

inactiveUserTimeout = 15 * time.Minute
metricsCleanupInterval = inactiveUserTimeout / 5
)

const (
Expand Down Expand Up @@ -150,7 +147,7 @@ type Distributor struct {
subservices *services.Manager
subservicesWatcher *services.FailureWatcher

activeUsers *util.ActiveUsers
activeUsers *util.ActiveUsersCleanupService
}

// Config contains the configuration required to
Expand Down Expand Up @@ -258,10 +255,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
limits: limits,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
HATracker: haTracker,
activeUsers: util.NewActiveUsers(),
}
d.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
d.HATracker.cleanupHATrackerMetricsForUser(user)
cleanupMetricsForUser(user, d.log)
})

subservices = append(subservices, d.ingesterPool)
subservices = append(subservices, d.ingesterPool, d.activeUsers)
d.subservices, err = services.NewManager(subservices...)
if err != nil {
return nil, err
Expand All @@ -279,19 +279,8 @@ func (d *Distributor) starting(ctx context.Context) error {
}

func (d *Distributor) running(ctx context.Context) error {
metricsCleanupTimer := time.NewTicker(metricsCleanupInterval)
defer metricsCleanupTimer.Stop()

for {
select {
case <-metricsCleanupTimer.C:
inactiveUsers := d.activeUsers.PurgeInactiveUsers(time.Now().Add(-inactiveUserTimeout).UnixNano())
for _, userID := range inactiveUsers {
cleanupMetricsForUser(userID, d.log)
d.HATracker.cleanupHATrackerMetricsForUser(userID)
}
continue

case <-ctx.Done():
return nil

Expand Down Expand Up @@ -434,7 +423,7 @@ func (d *Distributor) Push(ctx context.Context, req *ingester_client.WriteReques
}

now := time.Now()
d.activeUsers.UpdateUserTimestamp(userID, now.UnixNano())
d.activeUsers.UpdateUserTimestamp(userID, now)

source := util.GetSourceIPsFromOutgoingCtx(ctx)

Expand Down
10 changes: 3 additions & 7 deletions pkg/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
}

// Initializes frontend (either V1 -- without scheduler, or V2 -- with scheduler) or no frontend at
// InitFrontend initializes frontend (either V1 -- without scheduler, or V2 -- with scheduler) or no frontend at
// all if downstream Prometheus URL is used instead.
//
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
Expand Down Expand Up @@ -70,11 +70,7 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i

default:
// No scheduler = use original frontend.
fr, err := v1.New(cfg.FrontendV1, limits, log, reg)
if err != nil {
return nil, nil, nil, err
}

return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, err
fr := v1.New(cfg.FrontendV1, limits, log, reg)
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil
}
}
5 changes: 4 additions & 1 deletion pkg/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
// v1 will be nil if DownstreamURL is defined.
require.Nil(t, v2)
if v1 != nil {
defer v1.Close()
require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), v1))
})
}

grpcServer := grpc.NewServer(
Expand Down
10 changes: 9 additions & 1 deletion pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ type Handler struct {

// Metrics.
querySeconds *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
}

// New creates a new frontend handler.
// NewHandler creates a new frontend handler.
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) http.Handler {
h := &Handler{
cfg: cfg,
Expand All @@ -75,6 +76,12 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
Name: "cortex_query_seconds_total",
Help: "Total amount of wall clock time spend processing queries.",
}, []string{"user"})

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
})
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = h.activeUsers.StartAsync(context.Background())
}

return h
Expand Down Expand Up @@ -161,6 +168,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(stats.LoadWallTime().Seconds())
f.activeUsers.UpdateUserTimestamp(userID, time.Now())

// Log stats.
logMessage := append([]interface{}{
Expand Down
52 changes: 37 additions & 15 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/grpcutil"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
)

Expand All @@ -45,13 +47,17 @@ type Limits interface {
// Frontend queues HTTP requests, dispatches them to backends, and handles retries
// for requests which failed.
type Frontend struct {
services.Service

cfg Config
log log.Logger
limits Limits

requestQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService

// Metrics.
queueLength *prometheus.GaugeVec
numClients prometheus.GaugeFunc
queueDuration prometheus.Histogram
}
Expand All @@ -66,36 +72,48 @@ type request struct {
response chan *httpgrpc.HTTPResponse
}

// New creates a new frontend.
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) {
queueLength := promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_query_frontend_queue_length",
Help: "Number of queries in the queue.",
}, []string{"user"})
// New creates a new frontend. Frontend implements service, and must be started and stopped.
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) *Frontend {

f := &Frontend{
cfg: cfg,
log: log,
limits: limits,
requestQueue: queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, queueLength),
cfg: cfg,
log: log,
limits: limits,
queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_query_frontend_queue_length",
Help: "Number of queries in the queue.",
}, []string{"user"}),
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_frontend_queue_duration_seconds",
Help: "Time spend by requests queued.",
Buckets: prometheus.DefBuckets,
}),
}

f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, f.queueLength)
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)

f.numClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_query_frontend_connected_clients",
Help: "Number of worker clients currently connected to the frontend.",
}, f.requestQueue.GetConnectedQuerierWorkersMetric)

return f, nil
f.Service = services.NewIdleService(f.starting, f.stopping)
return f
}

func (f *Frontend) starting(ctx context.Context) error {
return services.StartAndAwaitRunning(ctx, f.activeUsers)
}

// Close stops new requests and errors out any pending requests.
func (f *Frontend) Close() {
func (f *Frontend) stopping(_ error) error {
// Stops new requests and errors out any pending requests.
f.requestQueue.Stop()
return services.StopAndAwaitTerminated(context.Background(), f.activeUsers)
}

func (f *Frontend) cleanupInactiveUserMetrics(user string) {
f.queueLength.DeleteLabelValues(user)
}

// RoundTripGRPC round trips a proto (instead of a HTTP request).
Expand Down Expand Up @@ -263,13 +281,17 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error {
return err
}

req.enqueueTime = time.Now()
now := time.Now()
req.enqueueTime = now
req.queueSpan, _ = opentracing.StartSpanFromContext(ctx, "queued")

// aggregate the max queriers limit in the case of a multi tenant query
maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueriersPerUser)

err = f.requestQueue.EnqueueRequest(tenant.JoinTenantIDs(tenantIDs), req, maxQueriers, nil)
joinedTenantID := tenant.JoinTenantIDs(tenantIDs)
f.activeUsers.UpdateUserTimestamp(joinedTenantID, now)

err = f.requestQueue.EnqueueRequest(joinedTenantID, req, maxQueriers, nil)
if err == queue.ErrTooManyRequests {
return errTooManyRequest
}
Expand Down
Loading