Skip to content

Commit

Permalink
chore(bloomgateway): apply diagnostic hints
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto committed Oct 18, 2024
1 parent a7d02ab commit 407cf3c
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses []v1.Output)

// dedupe outputs, merging the same series.
// This returns an Iterator[v1.Output]
dedupedResps := iter.NewDedupingIter[v1.Output, v1.Output](
dedupedResps := iter.NewDedupingIter(
// eq
func(o1, o2 v1.Output) bool {
return o1.Fp == o2.Fp
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,12 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh
iters = append(iters, iter.NewPeekIter(iter.NewSliceIter(inp)))
}

heapIter := v1.NewHeapIterator[*logproto.GroupedChunkRefs](
heapIter := v1.NewHeapIterator(
func(a, b *logproto.GroupedChunkRefs) bool { return a.Fingerprint < b.Fingerprint },
iters...,
)

dedupeIter := iter.NewDedupingIter[*logproto.GroupedChunkRefs, *logproto.GroupedChunkRefs](
dedupeIter := iter.NewDedupingIter(
// eq
func(a, b *logproto.GroupedChunkRefs) bool { return a.Fingerprint == b.Fingerprint },
// from
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/multiplexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestTask_RequestIterator(t *testing.T) {
}

// merge the request iterators using the heap sort iterator
it := v1.NewHeapIterator[v1.Request](func(r1, r2 v1.Request) bool { return r1.Fp < r2.Fp }, iters...)
it := v1.NewHeapIterator(func(r1, r2 v1.Request) bool { return r1.Fp < r2.Fp }, iters...)

// first item
require.True(t, it.Next())
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (p *processor) processBlock(_ context.Context, bq *bloomshipper.CloseableBl
iters = append(iters, it)
}

logger := log.With(p.logger, "block", bq.BlockRef.String())
logger := log.With(p.logger, "block", bq.String())
fq := blockQuerier.Fuse(iters, logger)

start := time.Now()
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
// We can perform requests sequentially, because most of the time the request
// only covers a single day, and if not, it's at most two days.
for _, s := range partitionSeriesByDay(from, through, grouped) {
day := bloomshipper.NewInterval(s.day.Time, s.day.Time.Add(Day))
day := bloomshipper.NewInterval(s.day.Time, s.day.Add(Day))
blocks, skipped, err := bq.blockResolver.Resolve(ctx, tenant, day, s.series)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/chunk/cache/resultscache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,10 @@ func merge(extents []Extent, acc *accumulator) ([]Extent, error) {
return nil, err
}
return append(extents, Extent{
Start: acc.Extent.Start,
End: acc.Extent.End,
Start: acc.Start,
End: acc.End,
Response: anyResp,
TraceId: acc.Extent.TraceId,
TraceId: acc.TraceId,
}), nil
}

Expand Down Expand Up @@ -386,7 +386,7 @@ func (s ResultsCache) partition(req Request, extents []Extent) ([]Request, []Res

// If start and end are the same (valid in promql), start == req.GetEnd() and we won't do the query.
// But we should only do the request if we don't have a valid cached response for it.
if req.GetStart() == req.GetEnd() && len(cachedResponses) == 0 {
if req.GetStart().Equal(req.GetEnd()) && len(cachedResponses) == 0 {
requests = append(requests, req)
}

Expand Down

0 comments on commit 407cf3c

Please sign in to comment.