Skip to content

Add basic query stats collection & logging. #3539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- limit for outgoing gRPC messages has changed from 2147483647 to 16777216 bytes
- limit for incoming gRPC messages has changed from 4194304 to 104857600 bytes
* [FEATURE] Distributor/Ingester: Provide ability to not overflow writes in the presence of a leaving or unhealthy ingester. This allows for more efficient ingester rolling restarts. #3305
* [FEATURE] Query-frontend: introduced query statistics logged in the query-frontend when enabled via `-frontend.query-stats-enabled=true`. When enabled, the metric `cortex_query_seconds_total` is tracked, counting the sum of the wall time spent across all queriers while running queries (on a per-tenant basis). The metrics `cortex_request_duration_seconds` and `cortex_query_seconds_total` are different: the first one tracks the request duration (eg. HTTP request from the client), while the latter tracks the sum of the wall time on all queriers involved executing the query. #3539
* [ENHANCEMENT] API: Add GZIP HTTP compression to the API responses. Compression can be enabled via `-api.response-compression-enabled`. #3536
* [ENHANCEMENT] Added zone-awareness support on queries. When zone-awareness is enabled, queries will still succeed if all ingesters in a single zone will fail. #3414
* [ENHANCEMENT] Blocks storage ingester: exported more TSDB-related metrics. #3412
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pkg/ring/ring.pb.go: pkg/ring/ring.proto
pkg/frontend/v1/frontendv1pb/frontend.pb.go: pkg/frontend/v1/frontendv1pb/frontend.proto
pkg/frontend/v2/frontendv2pb/frontend.pb.go: pkg/frontend/v2/frontendv2pb/frontend.proto
pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto
pkg/querier/stats/stats.pb.go: pkg/querier/stats/stats.proto
pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto
Expand Down
3 changes: 3 additions & 0 deletions development/tsdb-blocks-storage-s3/config/cortex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ store_gateway:
consul:
host: consul:8500

frontend:
query_stats_enabled: true

frontend_worker:
frontend_address: "query-frontend:9007"
match_max_concurrent: true
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,12 @@ The `query_frontend_config` configures the Cortex query-frontend.
# CLI flag: -frontend.max-body-size
[max_body_size: <int> | default = 10485760]

# True to enable query statistics tracking. When enabled, a message with some
# statistics is logged for every query. This configuration option must be set
# both on query-frontend and querier.
# CLI flag: -frontend.query-stats-enabled
[query_stats_enabled: <boolean> | default = false]

# Maximum number of outstanding requests per tenant per frontend; requests
# beyond this error with HTTP 429.
# CLI flag: -querier.max-outstanding-requests-per-tenant
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,4 @@ Currently experimental features are:
- Distributor: do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`)
- Ingester: close idle TSDB and remove them from local disk (`-blocks-storage.tsdb.close-idle-tsdb-timeout`)
- Tenant Deletion in Purger, for blocks storage.
- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`)
44 changes: 44 additions & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"sync"
"testing"
"time"
Expand All @@ -28,6 +29,7 @@ import (
type queryFrontendTestConfig struct {
testMissingMetricName bool
querySchedulerEnabled bool
queryStatsEnabled bool
setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string)
}

Expand All @@ -45,6 +47,21 @@ func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) {
})
}

func TestQueryFrontendWithBlocksStorageViaFlagsAndQueryStatsEnabled(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
testMissingMetricName: false,
queryStatsEnabled: true,
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
flags = BlocksStorageFlags()

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

return "", flags
},
})
}

func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQueryScheduler(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
testMissingMetricName: false,
Expand All @@ -60,6 +77,22 @@ func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQueryScheduler(t *testing.
})
}

func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQuerySchedulerAndQueryStatsEnabled(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
testMissingMetricName: false,
querySchedulerEnabled: true,
queryStatsEnabled: true,
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
flags = BlocksStorageFlags()

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

return "", flags
},
})
}

func TestQueryFrontendWithBlocksStorageViaConfigFile(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
testMissingMetricName: false,
Expand Down Expand Up @@ -183,6 +216,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
"-querier.split-queries-by-interval": "24h",
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-frontend.query-stats-enabled": strconv.FormatBool(cfg.queryStatsEnabled),
})

// Start the query-scheduler if enabled.
Expand Down Expand Up @@ -306,6 +340,16 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount))

// Ensure query stats metrics are tracked only when enabled.
if cfg.queryStatsEnabled {
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(
e2e.Greater(0),
[]string{"cortex_query_seconds_total"},
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"))))
} else {
require.NoError(t, queryFrontend.WaitRemovedMetric("cortex_query_seconds_total"))
}

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
assertServiceMetricsPrefixes(t, Ingester, ingester)
Expand Down
8 changes: 6 additions & 2 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk/purger"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
)

Expand Down Expand Up @@ -77,7 +78,7 @@ func (pc *IndexPageContent) GetContent() map[string]map[string]string {
return result
}

var indexPageTemplate = `
var indexPageTemplate = `
<!DOCTYPE html>
<html>
<head>
Expand Down Expand Up @@ -242,7 +243,10 @@ func NewQuerierHandler(
router.Path(legacyPrefix + "/api/v1/metadata").Methods("GET").Handler(legacyPromRouter)

// Add a middleware to extract the trace context and add a header.
return nethttp.MiddlewareFunc(opentracing.GlobalTracer(), router.ServeHTTP, nethttp.OperationNameFunc(func(r *http.Request) string {
handler := nethttp.MiddlewareFunc(opentracing.GlobalTracer(), router.ServeHTTP, nethttp.OperationNameFunc(func(r *http.Request) string {
return "internalQuerier"
}))

// Track execution time.
return stats.NewWallTimeMiddleware().Wrap(handler)
}
3 changes: 2 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
}

t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
t.Cfg.Worker.QueryStatsEnabled = t.Cfg.Frontend.Handler.QueryStatsEnabled
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util.Logger, prometheus.DefaultRegisterer)
}

Expand Down Expand Up @@ -520,7 +521,7 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) {
// Wrap roundtripper into Tripperware.
roundTripper = t.QueryFrontendTripperware(roundTripper)

handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util.Logger)
handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util.Logger, prometheus.DefaultRegisterer)
if t.Cfg.Frontend.CompressResponses {
handler = gziphandler.GzipHandler(handler)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(transport.NewHandler(config.Handler, rt, logger)))
).Wrap(transport.NewHandler(config.Handler, rt, logger, nil)))

httpServer := http.Server{
Handler: r,
Expand Down
99 changes: 83 additions & 16 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"

querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
)

Expand All @@ -34,11 +39,13 @@ var (
type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
MaxBodySize int64 `yaml:"max_body_size"`
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
}

func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LogQueriesLongerThan, "frontend.log-queries-longer-than", 0, "Log queries that are slower than the specified duration. Set to 0 to disable. Set to < 0 to enable on all queries.")
f.Int64Var(&cfg.MaxBodySize, "frontend.max-body-size", 10*1024*1024, "Max body size for downstream prometheus.")
f.BoolVar(&cfg.QueryStatsEnabled, "frontend.query-stats-enabled", false, "True to enable query statistics tracking. When enabled, a message with some statistics is logged for every query. This configuration option must be set both on query-frontend and querier.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this get the "frontend" name when it goes on both?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's now only set on frontend: #3595

}

// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
Expand All @@ -47,18 +54,43 @@ type Handler struct {
cfg HandlerConfig
log log.Logger
roundTripper http.RoundTripper

// Metrics.
querySeconds *prometheus.CounterVec
}

// New creates a new frontend handler.
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger) http.Handler {
return &Handler{
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) http.Handler {
h := &Handler{
cfg: cfg,
log: log,
roundTripper: roundTripper,
}

if cfg.QueryStatsEnabled {
h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_seconds_total",
Help: "Total amount of wall clock time spend processing queries.",
}, []string{"user"})
}

return h
}

func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
stats *querier_stats.Stats
queryString url.Values
)

// Initialise the stats in the context and make sure it's propagated
// down the request chain.
if f.cfg.QueryStatsEnabled {
var ctx context.Context
stats, ctx = querier_stats.ContextWithEmptyStats(r.Context())
r = r.WithContext(ctx)
}

defer func() {
_ = r.Body.Close()
}()
Expand Down Expand Up @@ -86,38 +118,73 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// we don't check for copy error as there is no much we can do at this point
_, _ = io.Copy(w, resp.Body)

f.reportSlowQuery(queryResponseTime, r, buf)
}
// Check whether we should parse the query string.
shouldReportSlowQuery := f.cfg.LogQueriesLongerThan > 0 && queryResponseTime > f.cfg.LogQueriesLongerThan
if shouldReportSlowQuery || f.cfg.QueryStatsEnabled {
queryString = f.parseRequestQueryString(r, buf)
}

// reportSlowQuery reports slow queries if LogQueriesLongerThan is set to <0, where 0 disables logging
func (f *Handler) reportSlowQuery(queryResponseTime time.Duration, r *http.Request, bodyBuf bytes.Buffer) {
if f.cfg.LogQueriesLongerThan == 0 || queryResponseTime <= f.cfg.LogQueriesLongerThan {
return
if shouldReportSlowQuery {
f.reportSlowQuery(r, queryString, queryResponseTime)
}
if f.cfg.QueryStatsEnabled {
f.reportQueryStats(r, queryString, queryResponseTime, stats)
}
}

logMessage := []interface{}{
// reportSlowQuery reports slow queries.
func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, queryResponseTime time.Duration) {
logMessage := append([]interface{}{
"msg", "slow query detected",
"method", r.Method,
"host", r.Host,
"path", r.URL.Path,
"time_taken", queryResponseTime.String(),
}, formatQueryString(queryString)...)

level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.Stats) {
userID, err := tenant.TenantID(r.Context())
if err != nil {
return
}

// use previously buffered body
// Track stats.
f.querySeconds.WithLabelValues(userID).Add(float64(stats.LoadWallTime()))

// Log stats.
logMessage := append([]interface{}{
"msg", "query stats",
"method", r.Method,
"path", r.URL.Path,
"response_time", queryResponseTime,
"query_wall_time", stats.LoadWallTime(),
}, formatQueryString(queryString)...)

level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) url.Values {
// Use previously buffered body.
r.Body = ioutil.NopCloser(&bodyBuf)

// Ensure the form has been parsed so all the parameters are present
err := r.ParseForm()
if err != nil {
level.Warn(util.WithContext(r.Context(), f.log)).Log("msg", "unable to parse form for request", "err", err)
level.Warn(util.WithContext(r.Context(), f.log)).Log("msg", "unable to parse request form", "err", err)
return nil
}

// Attempt to iterate through the Form to log any filled in values
for k, v := range r.Form {
logMessage = append(logMessage, fmt.Sprintf("param_%s", k), strings.Join(v, ","))
}
return r.Form
}

level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...)
func formatQueryString(queryString url.Values) (fields []interface{}) {
for k, v := range queryString {
fields = append(fields, fmt.Sprintf("param_%s", k), strings.Join(v, ","))
}
return fields
}

func writeError(w http.ResponseWriter, err error) {
Expand Down
14 changes: 10 additions & 4 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/grpcutil"
Expand Down Expand Up @@ -185,7 +186,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {

// Handle the stream sending & receiving on a goroutine so we can
// monitoring the contexts in a select and cancel things appropriately.
resps := make(chan *httpgrpc.HTTPResponse, 1)
resps := make(chan *frontendv1pb.ClientToFrontend, 1)
errs := make(chan error, 1)
go func() {
err = server.Send(&frontendv1pb.FrontendToClient{
Expand All @@ -203,7 +204,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
return
}

resps <- resp.HttpResponse
resps <- resp
}()

select {
Expand All @@ -219,9 +220,14 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
req.err <- err
return err

// Happy path: propagate the response.
// Happy path: merge the stats and propagate the response.
case resp := <-resps:
req.response <- resp
if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) {
stats := stats.FromContext(req.originalCtx)
stats.Merge(resp.Stats) // Safe if stats is nil.
}

req.response <- resp.HttpResponse
}
}
}
Expand Down
Loading