diff --git a/pkg/storage/bloom/v1/bloom_builder.go b/pkg/storage/bloom/v1/bloom_builder.go index 4b0e69cc3197..c327f5d6bfd9 100644 --- a/pkg/storage/bloom/v1/bloom_builder.go +++ b/pkg/storage/bloom/v1/bloom_builder.go @@ -28,6 +28,10 @@ func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockB } } +func (b *BloomBlockBuilder) UnflushedSize() int { + return b.scratch.Len() + b.page.UnflushedSize() +} + func (b *BloomBlockBuilder) Append(bloom *Bloom) (BloomOffset, error) { if !b.writtenSchema { if err := b.writeSchema(); err != nil { diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index a25422af774b..466687aa44b9 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -114,6 +114,10 @@ func (w *PageWriter) Reset() { w.n = 0 } +func (w *PageWriter) UnflushedSize() int { + return w.enc.Len() +} + func (w *PageWriter) SpaceFor(numBytes int) bool { // if a single bloom exceeds the target size, still accept it // otherwise only accept it if adding it would not exceed the target size diff --git a/pkg/storage/bloom/v1/index_builder.go b/pkg/storage/bloom/v1/index_builder.go index 067a79ad03f4..9703177f1200 100644 --- a/pkg/storage/bloom/v1/index_builder.go +++ b/pkg/storage/bloom/v1/index_builder.go @@ -35,6 +35,10 @@ func NewIndexBuilder(opts BlockOptions, writer io.WriteCloser) *IndexBuilder { } } +func (b *IndexBuilder) UnflushedSize() int { + return b.scratch.Len() + b.page.UnflushedSize() +} + func (b *IndexBuilder) WriteOpts() error { b.scratch.Reset() b.opts.Encode(b.scratch) diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go index c2f1f3e8f8d3..f040ab429728 100644 --- a/pkg/storage/bloom/v1/test_util.go +++ b/pkg/storage/bloom/v1/test_util.go @@ -132,9 +132,11 @@ func CompareIterators[A, B any]( a iter.Iterator[A], b iter.Iterator[B], ) { + var i int for a.Next() { - require.True(t, b.Next()) + require.Truef(t, b.Next(), "'a' has %dth element but 'b' does not'", i) f(t, a.At(), b.At()) + i++ } require.False(t, b.Next()) require.NoError(t, a.Err()) diff --git a/pkg/storage/bloom/v1/versioned_builder.go b/pkg/storage/bloom/v1/versioned_builder.go index 8844ddf43eb1..960c6bcdde92 100644 --- a/pkg/storage/bloom/v1/versioned_builder.go +++ b/pkg/storage/bloom/v1/versioned_builder.go @@ -125,10 +125,35 @@ func (b *V3Builder) AddSeries(series Series, offsets []BloomOffset, fields Set[F return false, errors.Wrapf(err, "writing index for series %v", series.Fingerprint) } - full, _, err := b.writer.Full(b.opts.BlockSize) + full, err := b.full() if err != nil { return false, errors.Wrap(err, "checking if block is full") } return full, nil } + +func (b *V3Builder) full() (bool, error) { + if b.opts.BlockSize == 0 { + // Unlimited block size + return false, nil + } + + full, writtenSize, err := b.writer.Full(b.opts.BlockSize) + if err != nil { + return false, errors.Wrap(err, "checking if block writer is full") + } + if full { + return true, nil + } + + // Even if the block writer is not full, we may have unflushed data in the bloom builders. + // Check if by flushing these, we would exceed the block size. + unflushedIndexSize := b.index.UnflushedSize() + unflushedBloomSize := b.blooms.UnflushedSize() + if uint64(writtenSize+unflushedIndexSize+unflushedBloomSize) > b.opts.BlockSize { + return true, nil + } + + return false, nil +} diff --git a/pkg/storage/bloom/v1/versioned_builder_test.go b/pkg/storage/bloom/v1/versioned_builder_test.go index 1e4a0f5a93b2..65a47caa9671 100644 --- a/pkg/storage/bloom/v1/versioned_builder_test.go +++ b/pkg/storage/bloom/v1/versioned_builder_test.go @@ -4,6 +4,7 @@ import ( "bytes" "testing" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/compression" @@ -17,7 +18,7 @@ import ( func smallBlockOpts(v Version, enc compression.Codec) BlockOptions { return BlockOptions{ Schema: NewSchema(v, enc), - SeriesPageSize: 100, + SeriesPageSize: 4 << 10, BloomPageSize: 2 << 10, BlockSize: 0, // unlimited } @@ -78,3 +79,103 @@ func TestV3Roundtrip(t *testing.T) { querier, ) } + +func seriesWithBlooms(nSeries int, fromFp, throughFp model.Fingerprint) []SeriesWithBlooms { + series, _ := MkBasicSeriesWithBlooms(nSeries, fromFp, throughFp, 0, 10000) + return series +} + +func seriesWithoutBlooms(nSeries int, fromFp, throughFp model.Fingerprint) []SeriesWithBlooms { + series := seriesWithBlooms(nSeries, fromFp, throughFp) + + // remove blooms from series + for i := range series { + series[i].Blooms = v2.NewSliceIter([]*Bloom{}) + } + + return series +} +func TestFullBlock(t *testing.T) { + opts := smallBlockOpts(V3, compression.None) + minBlockSize := opts.SeriesPageSize // 1 index page, 4KB + const maxEmptySeriesPerBlock = 47 + for _, tc := range []struct { + name string + maxBlockSize uint64 + series []SeriesWithBlooms + expected []SeriesWithBlooms + }{ + { + name: "only series without blooms", + maxBlockSize: minBlockSize, + // +1 so we test adding the last series that fills the block + series: seriesWithoutBlooms(maxEmptySeriesPerBlock+1, 0, 0xffff), + expected: seriesWithoutBlooms(maxEmptySeriesPerBlock+1, 0, 0xffff), + }, + { + name: "series without blooms and one with blooms", + maxBlockSize: minBlockSize, + series: append( + seriesWithoutBlooms(maxEmptySeriesPerBlock, 0, 0x7fff), + seriesWithBlooms(50, 0x8000, 0xffff)..., + ), + expected: append( + seriesWithoutBlooms(maxEmptySeriesPerBlock, 0, 0x7fff), + seriesWithBlooms(1, 0x8000, 0x8001)..., + ), + }, + { + name: "only one series with bloom", + maxBlockSize: minBlockSize, + series: seriesWithBlooms(10, 0, 0xffff), + expected: seriesWithBlooms(1, 0, 1), + }, + { + name: "one huge series with bloom and then series without", + maxBlockSize: minBlockSize, + series: append( + seriesWithBlooms(1, 0, 1), + seriesWithoutBlooms(100, 1, 0xffff)..., + ), + expected: seriesWithBlooms(1, 0, 1), + }, + { + name: "big block", + maxBlockSize: 1 << 20, // 1MB + series: seriesWithBlooms(100, 0, 0xffff), + expected: seriesWithBlooms(100, 0, 0xffff), + }, + } { + t.Run(tc.name, func(t *testing.T) { + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := NewByteReader(indexBuf, bloomsBuf) + opts.BlockSize = tc.maxBlockSize + + b, err := NewBlockBuilderV3(opts, writer) + require.NoError(t, err) + + _, err = b.BuildFrom(v2.NewSliceIter(tc.series)) + require.NoError(t, err) + + block := NewBlock(reader, NewMetrics(nil)) + querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter() + + CompareIterators( + t, + func(t *testing.T, a SeriesWithBlooms, b *SeriesWithBlooms) { + require.Equal(t, a.Series.Fingerprint, b.Series.Fingerprint) + require.ElementsMatch(t, a.Series.Chunks, b.Series.Chunks) + bloomsA, err := v2.Collect(a.Blooms) + require.NoError(t, err) + bloomsB, err := v2.Collect(b.Blooms) + require.NoError(t, err) + require.Equal(t, len(bloomsB), len(bloomsA)) + }, + v2.NewSliceIter(tc.expected), + querier, + ) + }) + } +}