Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics.go support for metadata queries(labels and series) #5971

Merged
merged 22 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
be7214d
Add different functions to record labels and series queries
kavirajk Apr 20, 2022
8d66ae3
Add tests for logging series and labels queries
kavirajk Apr 20, 2022
9a5f304
Add statistics to the Series and Labels proto response types
kavirajk Apr 20, 2022
20ff3ae
Plug in the statistics recorder on the query handler
kavirajk Apr 20, 2022
9445f4b
Remove duplicate imports and cleanup comments
kavirajk Apr 21, 2022
7813cb5
Remove the need to add usage statistics for metadata queries.
kavirajk Apr 22, 2022
4187298
Add status based on returned error from the api
kavirajk Apr 22, 2022
8c74491
Fixing lint error by fixin return values order
kavirajk Apr 22, 2022
4b4fe47
Use explicit logger
kavirajk Apr 22, 2022
9bfab8e
Remove unused recorders
kavirajk Apr 22, 2022
c2ed438
Add `total_entries` representing length of total result returned
kavirajk Apr 22, 2022
3b49532
Inject statscollector middleware for metadata queries
kavirajk Apr 22, 2022
66767cb
Fix query frontend stats middleware
kavirajk Apr 28, 2022
a5cb772
Fix tests with change in extra `totalEntriesReturned` field
kavirajk Apr 29, 2022
722a306
merge with main
kavirajk Apr 29, 2022
3ae9afd
`make check-generated-files`
kavirajk Apr 29, 2022
f33a24e
Fix typo
kavirajk Apr 29, 2022
1d81e7e
Fix tripperware middleware for label values endpoint
kavirajk May 2, 2022
1a03fc2
Merge branch 'main' into kavirajk/metrics.go-support-for-metadata-que…
kavirajk May 3, 2022
3d20cb6
Fix bug with label extraction
kavirajk May 3, 2022
e2a37a4
Add changelog entry
kavirajk May 3, 2022
783d952
Fix `total_entries` value for range query
kavirajk May 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ type query struct {
record bool
}

func (q *query) resultLength(res promql_parser.Value) int {
switch r := res.(type) {
case promql.Matrix:
return r.TotalSamples()
case logqlmodel.Streams:
return int(r.Lines())
default:
level.Error(q.logger).Log("msg", "unknown query result type", "err", fmt.Sprintf("expected promql.Matrix or logqlmodel.Streams but got %T", r))
return 0
}
}

// Exec Implements `Query`. It handles instrumentation & defers to Eval.
func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
log, ctx := spanlogger.New(ctx, "query.Exec")
Expand All @@ -187,7 +199,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {

queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)

statResult := statsCtx.Result(time.Since(start), queueTime)
statResult := statsCtx.Result(time.Since(start), queueTime, q.resultLength(data))
statResult.Log(level.Debug(log))

status := "200"
Expand All @@ -202,7 +214,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
}

if q.record {
RecordMetrics(ctx, q.logger, q.params, status, statResult, data)
RecordRangeAndInstantQueryMetrics(ctx, q.logger, q.params, status, statResult, data)
}

return logqlmodel.Result{
Expand Down
102 changes: 100 additions & 2 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logql
import (
"context"
"strings"
"time"

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
Expand All @@ -23,6 +24,8 @@ const (
QueryTypeMetric = "metric"
QueryTypeFilter = "filter"
QueryTypeLimited = "limited"
QueryTypeLabels = "labels"
QueryTypeSeries = "series"

latencyTypeSlow = "slow"
latencyTypeFast = "fast"
Expand Down Expand Up @@ -74,7 +77,14 @@ var (
linePerSecondLogUsage = usagestats.NewStatistics("query_log_lines_per_second")
)

func RecordMetrics(ctx context.Context, log log.Logger, p Params, status string, stats logql_stats.Result, result promql_parser.Value) {
func RecordRangeAndInstantQueryMetrics(
ctx context.Context,
log log.Logger,
p Params,
status string,
stats logql_stats.Result,
result promql_parser.Value,
) {
var (
logger = util_log.WithContext(ctx, log)
rt = string(GetRangeType(p))
Expand Down Expand Up @@ -113,13 +123,13 @@ func RecordMetrics(ctx context.Context, log log.Logger, p Params, status string,
"returned_lines", returnedLines,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"total_entries", stats.Summary.TotalEntriesReturned,
"queue_time", logql_stats.ConvertSecondsToNanoseconds(stats.Summary.QueueTime),
"subqueries", stats.Summary.Subqueries,
}...)

logValues = append(logValues, tagsToKeyValues(queryTags)...)

// we also log queries, useful for troubleshooting slow queries.
level.Info(logger).Log(
logValues...,
)
Expand All @@ -138,6 +148,94 @@ func RecordMetrics(ctx context.Context, log log.Logger, p Params, status string,
recordUsageStats(queryType, stats)
}

func RecordLabelQueryMetrics(
ctx context.Context,
log log.Logger,
start, end time.Time,
label, status string,
stats logql_stats.Result,
) {
var (
logger = util_log.WithContext(ctx, log)
latencyType = latencyTypeFast
queryType = QueryTypeLabels
)

// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
if stats.Summary.ExecTime > slowQueryThresholdSecond {
latencyType = latencyTypeSlow
}

level.Info(logger).Log(
"latency", latencyType,
"query_type", queryType,
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"label", label,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"total_entries", stats.Summary.TotalEntriesReturned,
)

bytesPerSecond.WithLabelValues(status, queryType, "", latencyType).
Observe(float64(stats.Summary.BytesProcessedPerSecond))
execLatency.WithLabelValues(status, queryType, "").
Observe(stats.Summary.ExecTime)
chunkDownloadLatency.WithLabelValues(status, queryType, "").
Observe(stats.ChunksDownloadTime().Seconds())
duplicatesTotal.Add(float64(stats.TotalDuplicates()))
chunkDownloadedTotal.WithLabelValues(status, queryType, "").
Add(float64(stats.TotalChunksDownloaded()))
ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent))
}

func RecordSeriesQueryMetrics(
ctx context.Context,
log log.Logger,
start, end time.Time,
match []string,
status string,
stats logql_stats.Result,
) {
var (
logger = util_log.WithContext(ctx, log)
latencyType = latencyTypeFast
queryType = QueryTypeSeries
)

// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
if stats.Summary.ExecTime > slowQueryThresholdSecond {
latencyType = latencyTypeSlow
}

// we also log queries, useful for troubleshooting slow queries.
cstyan marked this conversation as resolved.
Show resolved Hide resolved
level.Info(logger).Log(
"latency", latencyType,
"query_type", queryType,
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"match", strings.Join(match, ":"), // not using comma (,) as separator as matcher may already have comma (e.g: `{a="b", c="d"}`)
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"total_entries", stats.Summary.TotalEntriesReturned,
)

bytesPerSecond.WithLabelValues(status, queryType, "", latencyType).
Observe(float64(stats.Summary.BytesProcessedPerSecond))
execLatency.WithLabelValues(status, queryType, "").
Observe(stats.Summary.ExecTime)
chunkDownloadLatency.WithLabelValues(status, queryType, "").
Observe(stats.ChunksDownloadTime().Seconds())
duplicatesTotal.Add(float64(stats.TotalDuplicates()))
chunkDownloadedTotal.WithLabelValues(status, queryType, "").
Add(float64(stats.TotalChunksDownloaded()))
ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent))
}

func recordUsageStats(queryType string, stats logql_stats.Result) {
if queryType == QueryTypeMetric {
bytePerSecondMetricUsage.Record(float64(stats.Summary.BytesProcessedPerSecond))
Expand Down
57 changes: 55 additions & 2 deletions pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestLogSlowQuery(t *testing.T) {

ctx = context.WithValue(ctx, httpreq.QueryTagsHTTPHeader, "Source=logvolhist,Feature=Beta")

RecordMetrics(ctx, util_log.Logger, LiteralParams{
RecordRangeAndInstantQueryMetrics(ctx, util_log.Logger, LiteralParams{
qs: `{foo="bar"} |= "buzz"`,
direction: logproto.BACKWARD,
end: now,
Expand All @@ -79,11 +79,64 @@ func TestLogSlowQuery(t *testing.T) {
QueueTime: 0.000000002,
ExecTime: 25.25,
TotalBytesProcessed: 100000,
TotalEntriesReturned: 10,
},
}, logqlmodel.Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB queue_time=2ns subqueries=0 source=logvolhist feature=beta\n",
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB total_entries=10 queue_time=2ns subqueries=0 source=logvolhist feature=beta\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
util_log.Logger = log.NewNopLogger()
}

func TestLogLabelsQuery(t *testing.T) {
buf := bytes.NewBufferString("")
logger := log.NewLogfmtLogger(buf)
tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter())
defer c.Close()
opentracing.SetGlobalTracer(tr)
sp := opentracing.StartSpan("")
ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp)
now := time.Now()
RecordLabelQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, "foo", "200", stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 100000,
ExecTime: 25.25,
TotalBytesProcessed: 100000,
TotalEntriesReturned: 12,
},
})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query_type=labels length=1h0m0s duration=25.25s status=200 label=foo throughput=100kB total_bytes=100kB total_entries=12\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
util_log.Logger = log.NewNopLogger()
}

func TestLogSeriesQuery(t *testing.T) {
buf := bytes.NewBufferString("")
logger := log.NewLogfmtLogger(buf)
tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter())
defer c.Close()
opentracing.SetGlobalTracer(tr)
sp := opentracing.StartSpan("")
ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp)
now := time.Now()
RecordSeriesQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, []string{`{container_name=~"prometheus.*", component="server"}`, `{app="loki"}`}, "200", stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 100000,
ExecTime: 25.25,
TotalBytesProcessed: 100000,
TotalEntriesReturned: 10,
},
})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query_type=series length=1h0m0s duration=25.25s status=200 match=\"{container_name=~\\\"prometheus.*\\\", component=\\\"server\\\"}:{app=\\\"loki\\\"}\" throughput=100kB total_bytes=100kB total_entries=10\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
Expand Down
10 changes: 6 additions & 4 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c *Context) Reset() {
}

// Result calculates the summary based on store and ingester data.
func (c *Context) Result(execTime time.Duration, queueTime time.Duration) Result {
func (c *Context) Result(execTime time.Duration, queueTime time.Duration, totalEntriesReturned int) Result {
r := c.result

r.Merge(Result{
Expand All @@ -101,7 +101,7 @@ func (c *Context) Result(execTime time.Duration, queueTime time.Duration) Result
Ingester: c.ingester,
})

r.ComputeSummary(execTime, queueTime)
r.ComputeSummary(execTime, queueTime, totalEntriesReturned)

return r
}
Expand All @@ -125,7 +125,7 @@ func JoinIngesters(ctx context.Context, inc Ingester) {
}

// ComputeSummary compute the summary of the statistics.
func (r *Result) ComputeSummary(execTime time.Duration, queueTime time.Duration) {
func (r *Result) ComputeSummary(execTime time.Duration, queueTime time.Duration, totalEntriesReturned int) {
r.Summary.TotalBytesProcessed = r.Querier.Store.Chunk.DecompressedBytes + r.Querier.Store.Chunk.HeadChunkBytes +
r.Ingester.Store.Chunk.DecompressedBytes + r.Ingester.Store.Chunk.HeadChunkBytes
r.Summary.TotalLinesProcessed = r.Querier.Store.Chunk.DecompressedLines + r.Querier.Store.Chunk.HeadChunkLines +
Expand All @@ -140,6 +140,8 @@ func (r *Result) ComputeSummary(execTime time.Duration, queueTime time.Duration)
if queueTime != 0 {
r.Summary.QueueTime = queueTime.Seconds()
}

r.Summary.TotalEntriesReturned = int64(totalEntriesReturned)
}

func (s *Store) Merge(m Store) {
Expand Down Expand Up @@ -173,7 +175,7 @@ func (r *Result) Merge(m Result) {
r.Querier.Merge(m.Querier)
r.Ingester.Merge(m.Ingester)
r.ComputeSummary(ConvertSecondsToNanoseconds(r.Summary.ExecTime+m.Summary.ExecTime),
ConvertSecondsToNanoseconds(r.Summary.QueueTime+m.Summary.QueueTime))
ConvertSecondsToNanoseconds(r.Summary.QueueTime+m.Summary.QueueTime), int(r.Summary.TotalEntriesReturned))
}

// ConvertSecondsToNanoseconds converts time.Duration representation of seconds (float64)
Expand Down
Loading