Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): compute chunks once #12664

Merged
merged 27 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4807f48
remove unnecessary fallback
owen-d Mar 27, 2024
48402d2
[wip] wiring up support to pass store chunks to queriers
owen-d Mar 28, 2024
bb283f1
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Mar 29, 2024
7a05032
[wip] threading through store overrides for chunkrefs
owen-d Mar 29, 2024
97bc139
multi-tenant querier partitions store overrides by tenant id
owen-d Mar 29, 2024
3858893
metrics & ifc alignment
owen-d Mar 31, 2024
ea68788
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Apr 17, 2024
46d85f9
remove unused fn
owen-d Apr 17, 2024
db62dd8
send chunks in shards resp
owen-d Apr 17, 2024
ad855e6
type alignment
owen-d Apr 17, 2024
93a9ce4
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Apr 17, 2024
6ce86b4
type alignment
owen-d Apr 17, 2024
294261d
ShardsResponse.Merge extension
owen-d Apr 17, 2024
d7f2af9
fix unrelated codec test err msg
owen-d Apr 17, 2024
e8f58f5
tidy
owen-d Apr 17, 2024
4639cfd
binding shard to chunk refs
owen-d Apr 18, 2024
a764c11
simplify+pointer for shard chunks
owen-d Apr 18, 2024
3ba9330
fix signature
owen-d Apr 18, 2024
b2990bf
precomputed chunk logging
owen-d Apr 19, 2024
141c4f7
log matchers & always use mutex while accumulating chunks to shards
owen-d Apr 19, 2024
8db855d
more logging
owen-d Apr 19, 2024
8bdb823
better logging for gateway.go
owen-d Apr 22, 2024
56eabcc
independent handling for precomputed chunks vs bloom enablement optio…
owen-d Apr 23, 2024
33c8e82
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Apr 23, 2024
a3bd99c
make doc
owen-d Apr 23, 2024
82923e7
pr feedback
owen-d Apr 29, 2024
7b2f72e
pr feedback: only dispatch to bloom querier when line filters exist
owen-d Apr 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
metrics & ifc alignment
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d committed Mar 31, 2024
commit 385889388d60c0f632120bb870e8845e90bb19e8
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq
}

// get chunk references
chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, chunk.NewPredicate(matchers, nil))
chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, chunk.NewPredicate(matchers, nil), nil)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/async_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
store := newStoreMock()
store.On("GetChunks", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.storeChunks, tc.storeFetcher, nil)
store.On("GetChunks", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.storeChunks, tc.storeFetcher, nil)
store.On("GetChunkFetcher", mock.Anything).Return(tc.ingesterFetcher)

ingesterQuerier := newIngesterQuerierMock()
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestAsyncStore_QueryIngestersWithin(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {

store := newStoreMock()
store.On("GetChunks", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([][]chunk.Chunk{}, []*fetcher.Fetcher{}, nil)
store.On("GetChunks", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([][]chunk.Chunk{}, []*fetcher.Fetcher{}, nil)

ingesterQuerier := newIngesterQuerierMock()
ingesterQuerier.On("GetChunkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil)
Expand Down
15 changes: 11 additions & 4 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
)

type ChunkMetrics struct {
refs *prometheus.CounterVec
series *prometheus.CounterVec
chunks *prometheus.CounterVec
batches *prometheus.HistogramVec
refs *prometheus.CounterVec
refsBypassed prometheus.Counter
series *prometheus.CounterVec
chunks *prometheus.CounterVec
batches *prometheus.HistogramVec
}

const (
Expand All @@ -52,6 +53,12 @@ func NewChunkMetrics(r prometheus.Registerer, maxBatchSize int) *ChunkMetrics {
Name: "chunk_refs_total",
Help: "Number of chunks refs downloaded, partitioned by whether they intersect the query bounds.",
}, []string{"status"}),
refsBypassed: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: constants.Loki,
Subsystem: "store",
Name: "chunk_ref_lookups_bypassed_total",
Help: "Number of chunk refs that were bypassed due to store overrides: computed during planning to avoid lookups",
}),
series: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Subsystem: "store",
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,9 @@ func (s *LokiStore) lazyChunks(
filtered += len(chks[i])
}

if storeChunksOverride != nil {
s.chunkMetrics.refsBypassed.Add(float64(len(storeChunksOverride.Refs)))
}
s.chunkMetrics.refs.WithLabelValues(statusDiscarded).Add(float64(prefiltered - filtered))
s.chunkMetrics.refs.WithLabelValues(statusMatched).Add(float64(filtered))

Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ func TestStore_indexPrefixChange(t *testing.T) {

// get all the chunks from the first period
predicate := chunk.NewPredicate(newMatchers(fooLabelsWithName.String()), nil)
chunks, _, err := store.GetChunks(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate), predicate)
chunks, _, err := store.GetChunks(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate), predicate, nil)
require.NoError(t, err)
var totalChunks int
for _, chks := range chunks {
Expand Down Expand Up @@ -1407,7 +1407,7 @@ func TestStore_indexPrefixChange(t *testing.T) {

// get all the chunks from both the stores
predicate = chunk.NewPredicate(newMatchers(fooLabelsWithName.String()), nil)
chunks, _, err = store.GetChunks(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate.Add(24*time.Hour)), predicate)
chunks, _, err = store.GetChunks(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate.Add(24*time.Hour)), predicate, nil)
require.NoError(t, err)

totalChunks = 0
Expand Down Expand Up @@ -1541,7 +1541,7 @@ func TestStore_MultiPeriod(t *testing.T) {

// get all the chunks from both the stores
predicate := chunk.NewPredicate(newMatchers(fooLabelsWithName.String()), nil)
chunks, _, err := store.GetChunks(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), predicate)
chunks, _, err := store.GetChunks(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), predicate, nil)
require.NoError(t, err)
var totalChunks int
for _, chks := range chunks {
Expand Down Expand Up @@ -1912,7 +1912,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {

// get all the chunks from both the stores
predicate := chunk.NewPredicate(newMatchers(fooLabelsWithName.String()), nil)
chunks, _, err := store.GetChunks(ctx, "fake", timeToModelTime(boltdbShipperStartDate), timeToModelTime(tsdbStartDate.Add(24*time.Hour)), predicate)
chunks, _, err := store.GetChunks(ctx, "fake", timeToModelTime(boltdbShipperStartDate), timeToModelTime(tsdbStartDate.Add(24*time.Hour)), predicate, nil)
require.NoError(t, err)
var totalChunks int
for _, chks := range chunks {
Expand Down