Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): compute chunks once #12664

Merged
merged 27 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4807f48
remove unnecessary fallback
owen-d Mar 27, 2024
48402d2
[wip] wiring up support to pass store chunks to queriers
owen-d Mar 28, 2024
bb283f1
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Mar 29, 2024
7a05032
[wip] threading through store overrides for chunkrefs
owen-d Mar 29, 2024
97bc139
multi-tenant querier partitions store overrides by tenant id
owen-d Mar 29, 2024
3858893
metrics & ifc alignment
owen-d Mar 31, 2024
ea68788
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Apr 17, 2024
46d85f9
remove unused fn
owen-d Apr 17, 2024
db62dd8
send chunks in shards resp
owen-d Apr 17, 2024
ad855e6
type alignment
owen-d Apr 17, 2024
93a9ce4
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Apr 17, 2024
6ce86b4
type alignment
owen-d Apr 17, 2024
294261d
ShardsResponse.Merge extension
owen-d Apr 17, 2024
d7f2af9
fix unrelated codec test err msg
owen-d Apr 17, 2024
e8f58f5
tidy
owen-d Apr 17, 2024
4639cfd
binding shard to chunk refs
owen-d Apr 18, 2024
a764c11
simplify+pointer for shard chunks
owen-d Apr 18, 2024
3ba9330
fix signature
owen-d Apr 18, 2024
b2990bf
precomputed chunk logging
owen-d Apr 19, 2024
141c4f7
log matchers & always use mutex while accumulating chunks to shards
owen-d Apr 19, 2024
8db855d
more logging
owen-d Apr 19, 2024
8bdb823
better logging for gateway.go
owen-d Apr 22, 2024
56eabcc
independent handling for precomputed chunks vs bloom enablement optio…
owen-d Apr 23, 2024
33c8e82
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Apr 23, 2024
a3bd99c
make doc
owen-d Apr 23, 2024
82923e7
pr feedback
owen-d Apr 29, 2024
7b2f72e
pr feedback: only dispatch to bloom querier when line filters exist
owen-d Apr 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
pr feedback
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d committed Apr 29, 2024
commit 82923e7c6eff3926946858aeb6ae534528c3f982
9 changes: 3 additions & 6 deletions pkg/logql/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,10 @@ func (s Shard) Ptr() *Shard {
}

func (s Shard) Bind(chunks *logproto.ChunkRefGroup) *ShardWithChunkRefs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function replaces Prt(), right? Can Ptr() be removed then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be, but is still used in a bunch of tests I'm not inclined to update here

res := &ShardWithChunkRefs{
Shard: s,
return &ShardWithChunkRefs{
Shard: s,
chunks: chunks,
}
if chunks != nil {
res.chunks = chunks
}
return res
}

func NewBoundedShard(shard logproto.Shard) Shard {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/multi_tenant_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (q *MultiTenantQuerier) SelectLogs(ctx context.Context, params logql.Select
// in case of multiple tenants, we need to filter the store chunks by tenant if they are provided
storeOverridesByTenant := make(map[string][]*logproto.ChunkRef)
if overrides := params.GetStoreChunks(); overrides != nil {
storeOverridesByTenant = partitionChunkRefsByTenant(params.GetStoreChunks().Refs)
storeOverridesByTenant = partitionChunkRefsByTenant(overrides.Refs)
}

iters := make([]iter.EntryIterator, len(matchedTenants))
Expand Down
12 changes: 6 additions & 6 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,14 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (quer

switch op := getOperation(r.URL.Path); op {
case QueryRangeOp:
req, err := parseRangeQueryWithStoreChunksExtension(r)
req, err := parseRangeQuery(r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

return req, nil
case InstantQueryOp:
req, err := parseInstantQueryWithStoreChunksExtension(r)
req, err := parseInstantQuery(r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
Expand Down Expand Up @@ -513,14 +513,14 @@ func (Codec) DecodeHTTPGrpcRequest(ctx context.Context, r *httpgrpc.HTTPRequest)

switch op := getOperation(httpReq.URL.Path); op {
case QueryRangeOp:
req, err := parseRangeQueryWithStoreChunksExtension(httpReq)
req, err := parseRangeQuery(httpReq)
if err != nil {
return nil, ctx, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

return req, ctx, nil
case InstantQueryOp:
req, err := parseInstantQueryWithStoreChunksExtension(httpReq)
req, err := parseInstantQuery(httpReq)
if err != nil {
return nil, ctx, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
Expand Down Expand Up @@ -2086,7 +2086,7 @@ func mergeLokiResponse(responses ...queryrangebase.Response) *LokiResponse {
}
}

func parseRangeQueryWithStoreChunksExtension(r *http.Request) (*LokiRequest, error) {
func parseRangeQuery(r *http.Request) (*LokiRequest, error) {
rangeQuery, err := loghttp.ParseRangeQuery(r)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2119,7 +2119,7 @@ func parseRangeQueryWithStoreChunksExtension(r *http.Request) (*LokiRequest, err
}, nil
}

func parseInstantQueryWithStoreChunksExtension(r *http.Request) (*LokiInstantRequest, error) {
func parseInstantQuery(r *http.Request) (*LokiInstantRequest, error) {
req, err := loghttp.ParseInstantQuery(r)
if err != nil {
return nil, err
Expand Down
19 changes: 0 additions & 19 deletions pkg/querier/queryrange/shard_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package queryrange
import (
"context"
"fmt"
"net/http"
strings "strings"
"time"

Expand All @@ -12,7 +11,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -264,24 +262,7 @@ func (r *dynamicShardResolver) ShardingRanges(expr syntax.Expr, targetBytesPerSh
})

if err != nil {
// check unimplemented to fallback
// TODO(owen-d): fix if this isn't right
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok && (resp.Code == http.StatusNotFound) {
n, bytesPerShard, err := r.Shards(expr)
if err != nil {
return nil, nil, errors.Wrap(err, "falling back to building linear shards from stats")
}
level.Debug(log).Log(
"msg", "falling back to building linear shards from stats",
"bytes_per_shard", bytesPerShard,
"shards", n,
"query", exprStr,
)
return sharding.LinearShards(n, uint64(n)*bytesPerShard), nil, nil
}

return nil, nil, errors.Wrapf(err, "failed to get shards for expression, got %T: %+v", err, err)

}

casted, ok := resp.(*ShardsResponse)
Expand Down
Loading