Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(bloom): remove unused code from blooms #14539

Merged
merged 6 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/bloombuild/builder/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (i *blockLoadingIter) loadNext() bool {
blockRefs := i.overlapping.At()

loader := newBatchedBlockLoader(i.ctx, i.fetcher, blockRefs, i.batchSize)
filtered := iter.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)
filtered := iter.NewFilterIter(loader, i.filter)

iters := make([]iter.PeekIterator[*v1.SeriesWithBlooms], 0, len(blockRefs))
for filtered.Next() {
Expand Down Expand Up @@ -279,7 +279,7 @@ func (i *blockLoadingIter) loadNext() bool {
// two overlapping blocks can conceivably have the same series, so we need to dedupe,
// preferring the one with the most chunks already indexed since we'll have
// to add fewer chunks to the bloom
i.iter = iter.NewDedupingIter[*v1.SeriesWithBlooms, *v1.SeriesWithBlooms](
i.iter = iter.NewDedupingIter(
func(a, b *v1.SeriesWithBlooms) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
Expand Down Expand Up @@ -346,7 +346,7 @@ func overlappingBlocksIter(inputs []bloomshipper.BlockRef) iter.Iterator[[]bloom
// can we assume sorted blocks?
peekIter := iter.NewPeekIter(iter.NewSliceIter(inputs))

return iter.NewDedupingIter[bloomshipper.BlockRef, []bloomshipper.BlockRef](
return iter.NewDedupingIter(
func(a bloomshipper.BlockRef, b []bloomshipper.BlockRef) bool {
minFp := b[0].Bounds.Min
maxFp := slices.MaxFunc(b, func(a, b bloomshipper.BlockRef) int { return int(a.Bounds.Max - b.Bounds.Max) }).Bounds.Max
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ func TestBatchedLoader(t *testing.T) {
)
}

loader := newBatchedLoader[int, int, int](
loader := newBatchedLoader(
tc.ctx,
fetchers,
tc.inputs,
tc.mapper,
tc.batchSize,
)

got, err := v2.Collect[int](loader)
got, err := v2.Collect(loader)
if tc.err {
require.Error(t, err)
return
Expand Down
15 changes: 4 additions & 11 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type Builder struct {
metrics *Metrics
logger log.Logger

tsdbStore common.TSDBStore
bloomStore bloomshipper.Store
chunkLoader ChunkLoader

Expand Down Expand Up @@ -74,18 +73,12 @@ func New(
builderID := uuid.NewString()
logger = log.With(logger, "builder_id", builderID)

tsdbStore, err := common.NewTSDBStores("bloom-builder", schemaCfg, storeCfg, storageMetrics, logger)
if err != nil {
return nil, fmt.Errorf("error creating TSDB store: %w", err)
}

metrics := NewMetrics(r)
b := &Builder{
ID: builderID,
cfg: cfg,
limits: limits,
metrics: metrics,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
chunkLoader: NewStoreChunkLoader(fetcherProvider, metrics),
logger: logger,
Expand Down Expand Up @@ -386,7 +379,7 @@ func (b *Builder) processTask(
// Blocks are built consuming the series iterator. For observability, we wrap the series iterator
// with a counter iterator to count the number of times Next() is called on it.
// This is used to observe the number of series that are being processed.
seriesItrWithCounter := iter.NewCounterIter[*v1.Series](seriesItr)
seriesItrWithCounter := iter.NewCounterIter(seriesItr)

gen := NewSimpleBloomGenerator(
tenant,
Expand Down Expand Up @@ -416,7 +409,7 @@ func (b *Builder) processTask(
return nil, fmt.Errorf("failed to build block: %w", err)
}

logger := log.With(logger, "block", built.BlockRef.String())
logger := log.With(logger, "block", built.String())

if err := client.PutBlock(
ctx,
Expand Down Expand Up @@ -461,7 +454,7 @@ func (b *Builder) processTask(
}
meta.MetaRef = ref

logger = log.With(logger, "meta", meta.MetaRef.String())
logger = log.With(logger, "meta", meta.String())

if err := client.PutMeta(ctx, meta); err != nil {
level.Error(logger).Log("msg", "failed to write meta", "err", err)
Expand Down Expand Up @@ -490,7 +483,7 @@ func (b *Builder) loadWorkForGap(
table config.DayTable,
gap protos.Gap,
) (iter.Iterator[*v1.Series], iter.CloseResetIterator[*v1.SeriesWithBlooms], error) {
seriesItr := iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](gap.Series))
seriesItr := iter.NewCancelableIter(ctx, iter.NewSliceIter(gap.Series))

// load a blocks iterator for the gap
fetcher, err := b.bloomStore.Fetcher(table.ModelTime())
Expand Down
28 changes: 0 additions & 28 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package builder
import (
"context"
"fmt"
"io"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -17,29 +16,8 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/v3/pkg/storage/stores"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)

// inclusive range
type Keyspace struct {
min, max model.Fingerprint
}

func (k Keyspace) Cmp(other Keyspace) v1.BoundsCheck {
if other.max < k.min {
return v1.Before
} else if other.min > k.max {
return v1.After
}
return v1.Overlap
}

// Store is likely bound within. This allows specifying impls like ShardedStore<Store>
// to only request the shard-range needed from the existing store.
type BloomGenerator interface {
Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, toClose []io.Closer, results iter.Iterator[*v1.Block], err error)
}

// Simple implementation of a BloomGenerator.
type SimpleBloomGenerator struct {
userID string
Expand Down Expand Up @@ -247,12 +225,6 @@ func (b *LazyBlockBuilderIterator) Err() error {
return b.err
}

// IndexLoader loads an index. This helps us do things like
// load TSDBs for a specific period excluding multitenant (pre-compacted) indices
type indexLoader interface {
Index() (tsdb.Index, error)
}

// ChunkItersByFingerprint models the chunks belonging to a fingerprint
type ChunkItersByFingerprint struct {
fp model.Fingerprint
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro

minIdx, maxIdx := i*seriesPerBlock, (i+1)*seriesPerBlock

itr := v2.NewSliceIter[v1.SeriesWithBlooms](data[minIdx:maxIdx])
itr := v2.NewSliceIter(data[minIdx:maxIdx])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)

Expand Down Expand Up @@ -134,8 +134,8 @@ func TestSimpleBloomGenerator(t *testing.T) {
} {
t.Run(fmt.Sprintf("%s/%s", tc.desc, enc), func(t *testing.T) {
sourceBlocks, data, refs := blocksFromSchemaWithRange(t, 2, tc.fromSchema, 0x00000, 0x6ffff)
storeItr := v2.NewMapIter[v1.SeriesWithBlooms, *v1.Series](
v2.NewSliceIter[v1.SeriesWithBlooms](data),
storeItr := v2.NewMapIter(
v2.NewSliceIter(data),
func(swb v1.SeriesWithBlooms) *v1.Series {
return &swb.Series.Series
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b
case <-ctx.Done():
return iter.NewEmptyIter[*v1.Series](), ctx.Err()
default:
return iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](series)), nil
return iter.NewCancelableIter(ctx, iter.NewSliceIter(series)), nil
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/common/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestTSDBSeriesIter(t *testing.T) {
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

v1.EqualIterators[*v1.Series](
v1.EqualIterators(
t,
func(a, b *v1.Series) {
require.Equal(t, a, b)
Expand Down
6 changes: 0 additions & 6 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/v3/pkg/queue"
)

const (
Expand Down Expand Up @@ -211,7 +209,3 @@ func NewMetrics(
}),
}
}

func NewQueueMetrics(r prometheus.Registerer) *queue.Metrics {
return queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
}
8 changes: 4 additions & 4 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func New(
}

// Queue to manage tasks
queueMetrics := NewQueueMetrics(r)
queueMetrics := queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, NewQueueLimits(limits), queueMetrics)

// Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour
Expand Down Expand Up @@ -591,14 +591,14 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(
err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef})
if err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String())
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.String())
} else {
level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String())
level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.String())
return nil, errors.Wrap(err, "failed to delete meta")
}
}
deletedMetas++
level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.MetaRef.String())
level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.String())
}

level.Debug(logger).Log(
Expand Down
8 changes: 4 additions & 4 deletions pkg/bloombuild/planner/strategies/splitkeyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,13 @@ func blockPlansForGaps(
return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds)
})

peekingBlocks := iter.NewPeekIter[bloomshipper.BlockRef](
iter.NewSliceIter[bloomshipper.BlockRef](
peekingBlocks := iter.NewPeekIter(
iter.NewSliceIter(
planGap.Blocks,
),
)
// dedupe blocks which could be in multiple metas
itr := iter.NewDedupingIter[bloomshipper.BlockRef, bloomshipper.BlockRef](
itr := iter.NewDedupingIter(
func(a, b bloomshipper.BlockRef) bool {
return a == b
},
Expand All @@ -224,7 +224,7 @@ func blockPlansForGaps(
peekingBlocks,
)

deduped, err := iter.Collect[bloomshipper.BlockRef](itr)
deduped, err := iter.Collect(itr)
if err != nil {
return nil, fmt.Errorf("failed to dedupe blocks: %w", err)
}
Expand Down
125 changes: 0 additions & 125 deletions pkg/bloombuild/planner/util.go

This file was deleted.

Loading
Loading