Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics.go support for metadata queries(labels and series) #5971

Merged
merged 22 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
be7214d
Add different functions to record labels and series queries
kavirajk Apr 20, 2022
8d66ae3
Add tests for logging series and labels queries
kavirajk Apr 20, 2022
9a5f304
Add statistics to the Series and Labels proto response types
kavirajk Apr 20, 2022
20ff3ae
Plug in the statistics recorder on the query handler
kavirajk Apr 20, 2022
9445f4b
Remove duplicate imports and cleanup comments
kavirajk Apr 21, 2022
7813cb5
Remove the need to add usage statistics for metadata queries.
kavirajk Apr 22, 2022
4187298
Add status based on returned error from the api
kavirajk Apr 22, 2022
8c74491
Fixing lint error by fixin return values order
kavirajk Apr 22, 2022
4b4fe47
Use explicit logger
kavirajk Apr 22, 2022
9bfab8e
Remove unused recorders
kavirajk Apr 22, 2022
c2ed438
Add `total_entries` representing length of total result returned
kavirajk Apr 22, 2022
3b49532
Inject statscollector middleware for metadata queries
kavirajk Apr 22, 2022
66767cb
Fix query frontend stats middleware
kavirajk Apr 28, 2022
a5cb772
Fix tests with change in extra `totalEntriesReturned` field
kavirajk Apr 29, 2022
722a306
merge with main
kavirajk Apr 29, 2022
3ae9afd
`make check-generated-files`
kavirajk Apr 29, 2022
f33a24e
Fix typo
kavirajk Apr 29, 2022
1d81e7e
Fix tripperware middleware for label values endpoint
kavirajk May 2, 2022
1a03fc2
Merge branch 'main' into kavirajk/metrics.go-support-for-metadata-que…
kavirajk May 3, 2022
3d20cb6
Fix bug with label extraction
kavirajk May 3, 2022
e2a37a4
Add changelog entry
kavirajk May 3, 2022
783d952
Fix `total_entries` value for range query
kavirajk May 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
}

if q.record {
RecordMetrics(ctx, q.logger, q.params, status, statResult, data)
RecordRangeAndInstantQueryMetrics(ctx, q.logger, q.params, status, statResult, data)
}

return logqlmodel.Result{
Expand Down
95 changes: 94 additions & 1 deletion pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logql
import (
"context"
"strings"
"time"

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
Expand All @@ -13,6 +14,7 @@ import (

"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util/httpreq"
Expand All @@ -23,6 +25,8 @@ const (
QueryTypeMetric = "metric"
QueryTypeFilter = "filter"
QueryTypeLimited = "limited"
QueryTypeLabels = "labels"
QueryTypeSeries = "series"

latencyTypeSlow = "slow"
latencyTypeFast = "fast"
Expand Down Expand Up @@ -74,7 +78,7 @@ var (
linePerSecondLogUsage = usagestats.NewStatistics("query_log_lines_per_second")
)

func RecordMetrics(ctx context.Context, log log.Logger, p Params, status string, stats logql_stats.Result, result promql_parser.Value) {
func RecordRangeAndInstantQueryMetrics(ctx context.Context, log log.Logger, p Params, status string, stats logql_stats.Result, result promql_parser.Value) {
var (
logger = util_log.WithContext(ctx, log)
rt = string(GetRangeType(p))
Expand Down Expand Up @@ -138,6 +142,95 @@ func RecordMetrics(ctx context.Context, log log.Logger, p Params, status string,
recordUsageStats(queryType, stats)
}

func RecordLabelQueryMetrics(
ctx context.Context,
start, end time.Time,
label, status string,
stats stats.Result,
) {
var (
logger = util_log.WithContext(ctx, util_log.Logger)
latencyType = latencyTypeFast
queryType = QueryTypeLabels
)

// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
if stats.Summary.ExecTime > slowQueryThresholdSecond {
latencyType = latencyTypeSlow
}

// we also log queries, useful for troubleshooting slow queries.
cstyan marked this conversation as resolved.
Show resolved Hide resolved
level.Info(logger).Log(
"latency", latencyType,
"query_type", queryType,
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"label", label,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
)

bytesPerSecond.WithLabelValues(status, queryType, "", latencyType).
Observe(float64(stats.Summary.BytesProcessedPerSecond))
execLatency.WithLabelValues(status, queryType, "").
Observe(stats.Summary.ExecTime)
chunkDownloadLatency.WithLabelValues(status, queryType, "").
Observe(stats.ChunksDownloadTime().Seconds())
duplicatesTotal.Add(float64(stats.TotalDuplicates()))
chunkDownloadedTotal.WithLabelValues(status, queryType, "").
Add(float64(stats.TotalChunksDownloaded()))
ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent))

// TODO(kavi): Do we need recordUsageStats for labels query?
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
}

func RecordSeriesQueryMetrics(
ctx context.Context,
start, end time.Time,
match []string,
status string,
stats stats.Result,
) {
var (
logger = util_log.WithContext(ctx, util_log.Logger)
latencyType = latencyTypeFast
queryType = QueryTypeSeries
)

// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
if stats.Summary.ExecTime > slowQueryThresholdSecond {
latencyType = latencyTypeSlow
}

// we also log queries, useful for troubleshooting slow queries.
cstyan marked this conversation as resolved.
Show resolved Hide resolved
level.Info(logger).Log(
"latency", latencyType,
"query_type", queryType,
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"match", strings.Join(match, ":"), // not using comma (,) as separator as matcher may already have comma (e.g: `{a="b", c="d"}`)
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
)

bytesPerSecond.WithLabelValues(status, queryType, "", latencyType).
Observe(float64(stats.Summary.BytesProcessedPerSecond))
execLatency.WithLabelValues(status, queryType, "").
Observe(stats.Summary.ExecTime)
chunkDownloadLatency.WithLabelValues(status, queryType, "").
Observe(stats.ChunksDownloadTime().Seconds())
duplicatesTotal.Add(float64(stats.TotalDuplicates()))
chunkDownloadedTotal.WithLabelValues(status, queryType, "").
Add(float64(stats.TotalChunksDownloaded()))
ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent))

// TODO(kavi): Do we need recordUsageStats for series query?
}

func recordUsageStats(queryType string, stats logql_stats.Result) {
if queryType == QueryTypeMetric {
bytePerSecondMetricUsage.Record(float64(stats.Summary.BytesProcessedPerSecond))
Expand Down
52 changes: 51 additions & 1 deletion pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestLogSlowQuery(t *testing.T) {

ctx = context.WithValue(ctx, httpreq.QueryTagsHTTPHeader, "Source=logvolhist,Feature=Beta")

RecordMetrics(ctx, util_log.Logger, LiteralParams{
RecordRangeAndInstantQueryMetrics(ctx, util_log.Logger, LiteralParams{
qs: `{foo="bar"} |= "buzz"`,
direction: logproto.BACKWARD,
end: now,
Expand All @@ -90,6 +90,56 @@ func TestLogSlowQuery(t *testing.T) {
util_log.Logger = log.NewNopLogger()
}

func TestLogLabelsQuery(t *testing.T) {
buf := bytes.NewBufferString("")
util_log.Logger = log.NewLogfmtLogger(buf)
tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter())
defer c.Close()
opentracing.SetGlobalTracer(tr)
sp := opentracing.StartSpan("")
ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp)
now := time.Now()
RecordLabelQueryMetrics(ctx, now.Add(-1*time.Hour), now, "foo", "200", stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 100000,
ExecTime: 25.25,
TotalBytesProcessed: 100000,
},
})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query_type=labels length=1h0m0s duration=25.25s status=200 label=foo throughput=100kB total_bytes=100kB\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
util_log.Logger = log.NewNopLogger()
}

func TestLogSeriesQuery(t *testing.T) {
buf := bytes.NewBufferString("")
util_log.Logger = log.NewLogfmtLogger(buf)
tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter())
defer c.Close()
opentracing.SetGlobalTracer(tr)
sp := opentracing.StartSpan("")
ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp)
now := time.Now()
RecordSeriesQueryMetrics(ctx, now.Add(-1*time.Hour), now, []string{`{container_name=~"prometheus.*", component="server"}`, `{app="loki"}`}, "200", stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 100000,
ExecTime: 25.25,
TotalBytesProcessed: 100000,
},
})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query_type=series length=1h0m0s duration=25.25s status=200 match=\"{container_name=~\\\"prometheus.*\\\", component=\\\"server\\\"}:{app=\\\"loki\\\"}\" throughput=100kB total_bytes=100kB\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
util_log.Logger = log.NewNopLogger()
}

func Test_testToKeyValues(t *testing.T) {
cases := []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
httpreq.ExtractQueryTagsMiddleware(),
serverutil.RecoveryHTTPMiddleware,
t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware,
queryrange.StatsRangeQueryHTTPMiddleware,
serverutil.NewPrepopulateMiddleware(),
serverutil.ResponseJSONMiddleware(),
).Wrap(frontendHandler)
Expand All @@ -564,7 +564,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
httpMiddleware := middleware.Merge(
httpreq.ExtractQueryTagsMiddleware(),
t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware,
queryrange.StatsRangeQueryHTTPMiddleware,
)
tailURL, err := url.Parse(t.Cfg.Frontend.TailProxyURL)
if err != nil {
Expand Down
40 changes: 40 additions & 0 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/marshal"
marshal_legacy "github.com/grafana/loki/pkg/util/marshal/legacy"
serverutil "github.com/grafana/loki/pkg/util/server"
"github.com/grafana/loki/pkg/util/spanlogger"
util_validation "github.com/grafana/loki/pkg/util/validation"
"github.com/grafana/loki/pkg/validation"
)
Expand Down Expand Up @@ -200,7 +203,26 @@ func (q *QuerierAPI) LabelHandler(w http.ResponseWriter, r *http.Request) {
return
}

log, ctx := spanlogger.New(r.Context(), "query.Label")

start := time.Now()
statsCtx, ctx := stats.NewContext(ctx)

resp, err := q.querier.Label(r.Context(), req)
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)

// record stats about the label query
statResult := statsCtx.Result(time.Since(start), queueTime)
statResult.Log(level.Debug(log))

status := "200"
if err != nil {
status = "500"
// TODO(kavi): adjust status based on all possible errors from `q.Label`
}

logql.RecordLabelQueryMetrics(ctx, *req.Start, *req.End, req.Name, status, statResult)

if err != nil {
serverutil.WriteError(err, w)
return
Expand Down Expand Up @@ -350,7 +372,25 @@ func (q *QuerierAPI) SeriesHandler(w http.ResponseWriter, r *http.Request) {
return
}

log, ctx := spanlogger.New(r.Context(), "query.Series")

start := time.Now()
statsCtx, ctx := stats.NewContext(ctx)

resp, err := q.querier.Series(r.Context(), req)
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)

// record stats about the label query
statResult := statsCtx.Result(time.Since(start), queueTime)
statResult.Log(level.Debug(log))

status := "200"
if err != nil {
status = "500"
// TODO(kavi): adjust status based on all possible errors from `q.Series`
}

logql.RecordSeriesQueryMetrics(ctx, req.Start, req.End, req.Groups, status, statResult)
if err != nil {
serverutil.WriteError(err, w)
return
Expand Down
Loading