Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bugs in logs results caching and its tests #7925

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* [7906](https://github.com/grafana/loki/pull/7906) **kavirajk**: Add API endpoint that formats LogQL expressions and support new `fmt` subcommand in `logcli` to format LogQL query.
* [7966](https://github.com/grafana/loki/pull/7966) **sandeepsukhani**: Fix query-frontend request load balancing when using k8s service.
* [7988](https://github.com/grafana/loki/pull/7988) **ashwanthgoli** store: write overlapping chunks to multiple stores.
* [7925](https://github.com/grafana/loki/pull/7925) **sandeepsukhani**: Fix bugs in logs results caching causing query-frontend to return logs outside of query window.

##### Changes

Expand Down
46 changes: 42 additions & 4 deletions pkg/querier/queryrange/log_result_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ func (l *logResultCache) handleHit(ctx context.Context, cacheKey string, cachedR
result := emptyResponse(cachedRequest)
// if the request is the same and cover the whole time range,
// we can just return the cached result.
if !lokiReq.GetStartTs().After(cachedRequest.GetStartTs()) && lokiReq.GetStartTs().Equal(cachedRequest.GetStartTs()) &&
!lokiReq.GetEndTs().Before(cachedRequest.GetEndTs()) && lokiReq.GetEndTs().Equal(cachedRequest.GetEndTs()) {
Comment on lines -183 to -184
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check looked wrong to me, so I fixed it as well.

if cachedRequest.StartTs.UnixNano() <= lokiReq.StartTs.UnixNano() && cachedRequest.EndTs.UnixNano() >= lokiReq.EndTs.UnixNano() {
return result, nil
}
// we could be missing data at the start and the end.
Expand Down Expand Up @@ -240,7 +239,7 @@ func (l *logResultCache) handleHit(ctx context.Context, cacheKey string, cachedR
if startResp.Status != loghttp.QueryStatusSuccess {
return startResp, nil
}
result = mergeLokiResponse(startResp, result)
result = mergeLokiResponse(extractLokiResponse(lokiReq.GetStartTs(), lokiReq.GetEndTs(), startResp), result)
}
}

Expand All @@ -254,7 +253,7 @@ func (l *logResultCache) handleHit(ctx context.Context, cacheKey string, cachedR
if endResp.Status != loghttp.QueryStatusSuccess {
return endResp, nil
}
result = mergeLokiResponse(endResp, result)
result = mergeLokiResponse(extractLokiResponse(lokiReq.GetStartTs(), lokiReq.GetEndTs(), endResp), result)
}
}

Expand All @@ -274,6 +273,45 @@ func (l *logResultCache) handleHit(ctx context.Context, cacheKey string, cachedR
return result, nil
}

// extractLokiResponse extracts response with interval [start, end)
func extractLokiResponse(start, end time.Time, r *LokiResponse) *LokiResponse {
extractedResp := LokiResponse{
Status: r.Status,
Direction: r.Direction,
Limit: r.Limit,
Version: r.Version,
ErrorType: r.ErrorType,
Error: r.Error,
Statistics: r.Statistics,
Data: LokiData{
ResultType: r.Data.ResultType,
Result: []logproto.Stream{},
},
}
for _, stream := range r.Data.Result {
if stream.Entries[0].Timestamp.After(end) || stream.Entries[len(stream.Entries)-1].Timestamp.Before(start) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition one above uses ts.Nano() where as here we use time.After for comparison. Can we make both same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with different ways to compare the time for convenience and readability.
It is hard to do <= or >= with time.After and time.Before while keeping it simple to read.
If I do the same here, then it would make the code a little inefficient due to having to do the conversion for each log line. I think it would be good to keep it as is, but please feel free to push back if you feel otherwise.

continue
}

extractedStream := logproto.Stream{
Labels: stream.Labels,
Entries: []logproto.Entry{},
Hash: stream.Hash,
}
for _, entry := range stream.Entries {
if entry.Timestamp.Before(start) || entry.Timestamp.After(end) || entry.Timestamp.Equal(end) {
continue
}

extractedStream.Entries = append(extractedStream.Entries, entry)
}

extractedResp.Data.Result = append(extractedResp.Data.Result, extractedStream)
}

return &extractedResp
}

func isEmpty(lokiRes *LokiResponse) bool {
return lokiRes.Status == loghttp.QueryStatusSuccess && len(lokiRes.Data.Result) == 0
}
Expand Down
Loading