Skip to content

Commit

Permalink
Loki: Remove the bypass for "limited" queries (grafana#7510)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

Limited queries are queries which don't have a filter expression. 

Now all log queries will be handled by the LogFilterTripperware which
will result in them being split by time (they previously were not)

Signed-off-by: Edward Welch <edward.welch@grafana.com>


**Which issue(s) this PR fixes**:

We've found that very large timeframe `limited` queries will be sent to
queriers and then ingesters and can stall out the read path because of
the large time ranges.

Splitting these by time avoids this problem by keeping any subquery
limited in length to the `split_by_interval`.

It's important to note that this will likely increase some extra work
done by limited queries as more work for these will be parallelized and
can result in extra data being processed. This will be the same as how
filter queries are handled now, so it will be no worse than that.

In fact this was an optimization based on the premise that it's
advantageous not to split/shard limited queries but we are seeing this
not be the case when limited queries are run over very large time ranges
matching streams with huge volumes and combined with parsers like `|
json`

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the `CONTRIBUTING.md` guide
- [ ] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`

Signed-off-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
slim-bean authored Oct 25, 2022
1 parent f83988a commit 45caba4
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* [6835](https://github.com/grafana/loki/pull/6835) **DylanGuedes**: Add new per-tenant query timeout configuration and remove engine query timeout.
* [7212](https://github.com/grafana/loki/pull/7212) **Juneezee**: Replaces deprecated `io/ioutil` with `io` and `os`.
* [7361](https://github.com/grafana/loki/pull/7361) **szczepad**: Renames metric `loki_log_messages_total` to `loki_internal_log_messages_total`
* [7510](https://github.com/grafana/loki/pull/7510) **slim-bean**: Limited queries (queries without filter expressions) will now be split and sharded.

#### Promtail

Expand Down
9 changes: 3 additions & 6 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,14 @@ func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
case syntax.SampleExpr:
return r.metric.RoundTrip(req)
case syntax.LogSelectorExpr:
expr, err := transformRegexQuery(req, e)
// Note, this function can mutate the request
_, err := transformRegexQuery(req, e)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
if err := validateLimits(req, rangeQuery.Limit, r.limits); err != nil {
return nil, err
}
// Only filter expressions are query sharded
if !expr.HasFilter() {
return r.next.RoundTrip(req)
}
return r.log.RoundTrip(req)

default:
Expand Down Expand Up @@ -246,7 +243,7 @@ func getOperation(path string) string {
}
}

// NewLogFilterTripperware creates a new frontend tripperware responsible for handling log requests with regex.
// NewLogFilterTripperware creates a new frontend tripperware responsible for handling log requests.
func NewLogFilterTripperware(
cfg Config,
log log.Logger,
Expand Down
44 changes: 7 additions & 37 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func TestLabelsTripperware(t *testing.T) {
require.NoError(t, err)
}

func TestLogNoRegex(t *testing.T) {
func TestLogNoFilter(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
Expand All @@ -355,7 +355,7 @@ func TestLogNoRegex(t *testing.T) {
defer rt.Close()

lreq := &LokiRequest{
Query: `{app="foo"}`, // no regex so it should go to the querier
Query: `{app="foo"}`,
Limit: 1000,
StartTs: testTime.Add(-6 * time.Hour),
EndTs: testTime,
Expand All @@ -374,8 +374,9 @@ func TestLogNoRegex(t *testing.T) {
count, h := promqlResult(streams)
rt.setHandler(h)
_, err = tpw(rt).RoundTrip(req)
require.Equal(t, 1, *count)
require.NoError(t, err)
// fake round tripper is not called because we send "limited" queries to log tripperware
require.Equal(t, 0, *count)
require.Error(t, err)
}

func TestRegexpParamsSupport(t *testing.T) {
Expand All @@ -390,7 +391,7 @@ func TestRegexpParamsSupport(t *testing.T) {
defer rt.Close()

lreq := &LokiRequest{
Query: `{app="foo"}`, // no regex so it should go to the querier
Query: `{app="foo"}`,
Limit: 1000,
StartTs: testTime.Add(-6 * time.Hour),
EndTs: testTime,
Expand Down Expand Up @@ -473,7 +474,7 @@ func TestEntriesLimitsTripperware(t *testing.T) {
defer rt.Close()

lreq := &LokiRequest{
Query: `{app="foo"}`, // no regex so it should go to the querier
Query: `{app="foo"}`,
Limit: 10000,
StartTs: testTime.Add(-6 * time.Hour),
EndTs: testTime,
Expand All @@ -493,37 +494,6 @@ func TestEntriesLimitsTripperware(t *testing.T) {
require.Equal(t, httpgrpc.Errorf(http.StatusBadRequest, "max entries limit per query exceeded, limit > max_entries_limit (10000 > 5000)"), err)
}

func TestEntriesLimitWithZeroTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
require.NoError(t, err)
rt, err := newfakeRoundTripper()
require.NoError(t, err)
defer rt.Close()

lreq := &LokiRequest{
Query: `{app="foo"}`, // no regex so it should go to the querier
Limit: 10000,
StartTs: testTime.Add(-6 * time.Hour),
EndTs: testTime,
Direction: logproto.FORWARD,
Path: "/loki/api/v1/query_range",
}

ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)

req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)

_, err = tpw(rt).RoundTrip(req)
require.NoError(t, err)
}

func Test_getOperation(t *testing.T) {
cases := []struct {
name string
Expand Down

0 comments on commit 45caba4

Please sign in to comment.