Skip to content

Commit

Permalink
Fix max block size
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Oct 25, 2024
1 parent aa2635d commit aa75e5f
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 3 deletions.
4 changes: 4 additions & 0 deletions pkg/storage/bloom/v1/bloom_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockB
}
}

func (b *BloomBlockBuilder) UnflushedSize() int {
return b.scratch.Len() + b.page.UnflushedSize()
}

func (b *BloomBlockBuilder) Append(bloom *Bloom) (BloomOffset, error) {
if !b.writtenSchema {
if err := b.writeSchema(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func (w *PageWriter) Reset() {
w.n = 0
}

func (w *PageWriter) UnflushedSize() int {
return w.enc.Len()
}

func (w *PageWriter) SpaceFor(numBytes int) bool {
// if a single bloom exceeds the target size, still accept it
// otherwise only accept it if adding it would not exceed the target size
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/bloom/v1/index_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func NewIndexBuilder(opts BlockOptions, writer io.WriteCloser) *IndexBuilder {
}
}

func (b *IndexBuilder) UnflushedSize() int {
return b.scratch.Len() + b.page.UnflushedSize()
}

func (b *IndexBuilder) WriteOpts() error {
b.scratch.Reset()
b.opts.Encode(b.scratch)
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/bloom/v1/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,11 @@ func CompareIterators[A, B any](
a iter.Iterator[A],
b iter.Iterator[B],
) {
var i int
for a.Next() {
require.True(t, b.Next())
require.Truef(t, b.Next(), "'a' has %dth element but 'b' does not'", i)
f(t, a.At(), b.At())
i++
}
require.False(t, b.Next())
require.NoError(t, a.Err())
Expand Down
27 changes: 26 additions & 1 deletion pkg/storage/bloom/v1/versioned_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,35 @@ func (b *V3Builder) AddSeries(series Series, offsets []BloomOffset, fields Set[F
return false, errors.Wrapf(err, "writing index for series %v", series.Fingerprint)
}

full, _, err := b.writer.Full(b.opts.BlockSize)
full, err := b.full()
if err != nil {
return false, errors.Wrap(err, "checking if block is full")
}

return full, nil
}

func (b *V3Builder) full() (bool, error) {
if b.opts.BlockSize == 0 {
// Unlimited block size
return false, nil
}

full, writtenSize, err := b.writer.Full(b.opts.BlockSize)
if err != nil {
return false, errors.Wrap(err, "checking if block writer is full")
}
if full {
return true, nil
}

// Even if the block writer is not full, we may have unflushed data in the bloom builders.
// Check if by flushing these, we would exceed the block size.
unflushedIndexSize := b.index.UnflushedSize()
unflushedBloomSize := b.blooms.UnflushedSize()
if uint64(writtenSize+unflushedIndexSize+unflushedBloomSize) > b.opts.BlockSize {
return true, nil
}

return false, nil
}
103 changes: 102 additions & 1 deletion pkg/storage/bloom/v1/versioned_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/compression"
Expand All @@ -17,7 +18,7 @@ import (
func smallBlockOpts(v Version, enc compression.Codec) BlockOptions {
return BlockOptions{
Schema: NewSchema(v, enc),
SeriesPageSize: 100,
SeriesPageSize: 4 << 10,
BloomPageSize: 2 << 10,
BlockSize: 0, // unlimited
}
Expand Down Expand Up @@ -78,3 +79,103 @@ func TestV3Roundtrip(t *testing.T) {
querier,
)
}

func seriesWithBlooms(nSeries int, fromFp, throughFp model.Fingerprint) []SeriesWithBlooms {
series, _ := MkBasicSeriesWithBlooms(nSeries, fromFp, throughFp, 0, 10000)
return series
}

func seriesWithoutBlooms(nSeries int, fromFp, throughFp model.Fingerprint) []SeriesWithBlooms {
series := seriesWithBlooms(nSeries, fromFp, throughFp)

// remove blooms from series
for i := range series {
series[i].Blooms = v2.NewSliceIter([]*Bloom{})
}

return series
}
func TestFullBlock(t *testing.T) {
opts := smallBlockOpts(V3, compression.None)
minBlockSize := opts.SeriesPageSize // 1 index page, 4KB
const maxEmptySeriesPerBlock = 47
for _, tc := range []struct {
name string
maxBlockSize uint64
series []SeriesWithBlooms
expected []SeriesWithBlooms
}{
{
name: "only series without blooms",
maxBlockSize: minBlockSize,
// +1 so we test adding the last series that fills the block
series: seriesWithoutBlooms(maxEmptySeriesPerBlock+1, 0, 0xffff),
expected: seriesWithoutBlooms(maxEmptySeriesPerBlock+1, 0, 0xffff),
},
{
name: "series without blooms and one with blooms",
maxBlockSize: minBlockSize,
series: append(
seriesWithoutBlooms(maxEmptySeriesPerBlock, 0, 0x7fff),
seriesWithBlooms(50, 0x8000, 0xffff)...,
),
expected: append(
seriesWithoutBlooms(maxEmptySeriesPerBlock, 0, 0x7fff),
seriesWithBlooms(1, 0x8000, 0x8001)...,
),
},
{
name: "only one series with bloom",
maxBlockSize: minBlockSize,
series: seriesWithBlooms(10, 0, 0xffff),
expected: seriesWithBlooms(1, 0, 1),
},
{
name: "one huge series with bloom and then series without",
maxBlockSize: minBlockSize,
series: append(
seriesWithBlooms(1, 0, 1),
seriesWithoutBlooms(100, 1, 0xffff)...,
),
expected: seriesWithBlooms(1, 0, 1),
},
{
name: "big block",
maxBlockSize: 1 << 20, // 1MB
series: seriesWithBlooms(100, 0, 0xffff),
expected: seriesWithBlooms(100, 0, 0xffff),
},
} {
t.Run(tc.name, func(t *testing.T) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := NewByteReader(indexBuf, bloomsBuf)
opts.BlockSize = tc.maxBlockSize

b, err := NewBlockBuilderV3(opts, writer)
require.NoError(t, err)

_, err = b.BuildFrom(v2.NewSliceIter(tc.series))
require.NoError(t, err)

block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()

CompareIterators(
t,
func(t *testing.T, a SeriesWithBlooms, b *SeriesWithBlooms) {
require.Equal(t, a.Series.Fingerprint, b.Series.Fingerprint)
require.ElementsMatch(t, a.Series.Chunks, b.Series.Chunks)
bloomsA, err := v2.Collect(a.Blooms)
require.NoError(t, err)
bloomsB, err := v2.Collect(b.Blooms)
require.NoError(t, err)
require.Equal(t, len(bloomsB), len(bloomsA))
},
v2.NewSliceIter(tc.expected),
querier,
)
})
}
}

0 comments on commit aa75e5f

Please sign in to comment.