Skip to content
This repository was archived by the owner on Jul 30, 2022. It is now read-only.

Commit 4287398

Browse files
committed
Implement writing bloom filters in tables.
1 parent 3435554 commit 4287398

File tree

3 files changed

+176
-31
lines changed

3 files changed

+176
-31
lines changed

table/reader.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ var _ db.Iterator = (*tableIter)(nil)
172172
// opposed to iterating over a range of keys (where the minimum of that range
173173
// isn't necessarily in the table). In that case, i.err will be set to
174174
// db.ErrNotFound if f does not contain the key.
175-
func (i *tableIter) nextBlock(key []byte, f *filter) bool {
175+
func (i *tableIter) nextBlock(key []byte, f *filterReader) bool {
176176
if !i.index.Next() {
177177
i.err = i.index.err
178178
return false
@@ -246,18 +246,18 @@ func (i *tableIter) Close() error {
246246
return i.err
247247
}
248248

249-
type filter struct {
249+
type filterReader struct {
250250
data []byte
251251
offsets []byte // len(offsets) must be a multiple of 4.
252252
policy db.FilterPolicy
253253
shift uint32
254254
}
255255

256-
func (f *filter) valid() bool {
256+
func (f *filterReader) valid() bool {
257257
return f.data != nil
258258
}
259259

260-
func (f *filter) init(data []byte, policy db.FilterPolicy) (ok bool) {
260+
func (f *filterReader) init(data []byte, policy db.FilterPolicy) (ok bool) {
261261
if len(data) < 5 {
262262
return false
263263
}
@@ -276,7 +276,7 @@ func (f *filter) init(data []byte, policy db.FilterPolicy) (ok bool) {
276276
return true
277277
}
278278

279-
func (f *filter) mayContain(blockOffset uint64, key []byte) bool {
279+
func (f *filterReader) mayContain(blockOffset uint64, key []byte) bool {
280280
index := blockOffset >> f.shift
281281
if index >= uint64(len(f.offsets)/4-1) {
282282
return true
@@ -296,7 +296,7 @@ type Reader struct {
296296
err error
297297
index block
298298
comparer db.Comparer
299-
filter filter
299+
filter filterReader
300300
verifyChecksums bool
301301
// TODO: add a (goroutine-safe) LRU block cache.
302302
}
@@ -330,7 +330,7 @@ func (r *Reader) Get(key []byte, o *db.ReadOptions) (value []byte, err error) {
330330
if r.err != nil {
331331
return nil, r.err
332332
}
333-
f := (*filter)(nil)
333+
f := (*filterReader)(nil)
334334
if r.filter.valid() {
335335
f = &r.filter
336336
}
@@ -362,7 +362,7 @@ func (r *Reader) Find(key []byte, o *db.ReadOptions) db.Iterator {
362362
return r.find(key, o, nil)
363363
}
364364

365-
func (r *Reader) find(key []byte, o *db.ReadOptions, f *filter) db.Iterator {
365+
func (r *Reader) find(key []byte, o *db.ReadOptions, f *filterReader) db.Iterator {
366366
if r.err != nil {
367367
return &tableIter{err: r.err}
368368
}

table/table_test.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ var (
197197
tmpFileCount int
198198
)
199199

200-
func build(compression db.Compression) (db.File, error) {
200+
func build(compression db.Compression, fp db.FilterPolicy) (db.File, error) {
201201
// Create a sorted list of wordCount's keys.
202202
keys := make([]string, len(wordCount))
203203
i := 0
@@ -216,7 +216,8 @@ func build(compression db.Compression) (db.File, error) {
216216
defer f0.Close()
217217
tmpFileCount++
218218
w := NewWriter(f0, &db.Options{
219-
Compression: compression,
219+
Compression: compression,
220+
FilterPolicy: fp,
220221
})
221222
for _, k := range keys {
222223
v := wordCount[k]
@@ -379,7 +380,7 @@ func (c *countingFilterPolicy) MayContain(filter, key []byte) bool {
379380

380381
func TestWriter(t *testing.T) {
381382
// Check that we can read a freshly made table.
382-
f, err := build(db.DefaultCompression)
383+
f, err := build(db.DefaultCompression, nil)
383384
if err != nil {
384385
t.Fatal(err)
385386
}
@@ -389,31 +390,45 @@ func TestWriter(t *testing.T) {
389390
}
390391
}
391392

392-
func TestNoCompressionOutput(t *testing.T) {
393+
func testNoCompressionOutput(t *testing.T, fp db.FilterPolicy) {
394+
filename := "../testdata/h.no-compression.ldb"
395+
if fp != nil {
396+
filename = "../testdata/h.bloom.no-compression.ldb"
397+
}
398+
393399
// Check that a freshly made NoCompression table is byte-for-byte equal
394400
// to a pre-made table.
395-
a, err := ioutil.ReadFile(filepath.FromSlash("../testdata/h.no-compression.ldb"))
401+
want, err := ioutil.ReadFile(filepath.FromSlash(filename))
396402
if err != nil {
397403
t.Fatal(err)
398404
}
399-
f, err := build(db.NoCompression)
405+
406+
f, err := build(db.NoCompression, fp)
400407
if err != nil {
401408
t.Fatal(err)
402409
}
403410
stat, err := f.Stat()
404411
if err != nil {
405412
t.Fatal(err)
406413
}
407-
b := make([]byte, stat.Size())
408-
_, err = f.ReadAt(b, 0)
414+
got := make([]byte, stat.Size())
415+
_, err = f.ReadAt(got, 0)
409416
if err != nil {
410417
t.Fatal(err)
411418
}
412-
if !bytes.Equal(a, b) {
413-
t.Fatal("built table does not match pre-made table")
419+
420+
if !bytes.Equal(got, want) {
421+
i := 0
422+
for ; i < len(got) && i < len(want) && got[i] == want[i]; i++ {
423+
}
424+
t.Fatalf("built table does not match pre-made table. From byte %d onwards,\ngot:\n% x\nwant:\n% x",
425+
i, got[i:], want[i:])
414426
}
415427
}
416428

429+
func TestNoCompressionOutput(t *testing.T) { testNoCompressionOutput(t, nil) }
430+
func TestBloomNoCompressionOutput(t *testing.T) { testNoCompressionOutput(t, bloom.FilterPolicy(10)) }
431+
417432
func TestBlockIter(t *testing.T) {
418433
// k is a block that maps three keys "apple", "apricot", "banana" to empty strings.
419434
k := block([]byte("\x00\x05\x00apple\x02\x05\x00ricot\x00\x06\x00banana\x00\x00\x00\x00\x01\x00\x00\x00"))

table/writer.go

Lines changed: 143 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,97 @@ type indexEntry struct {
2323
keyLen int
2424
}
2525

26+
// filterBaseLog being 11 means that we generate a new filter for every 2KiB of
27+
// data.
28+
//
29+
// It's a little unfortunate that this is 11, whilst the default db.Options
30+
// BlockSize is 1<<12 or 4KiB, so that in practice, every second filter is
31+
// empty, but both values match the C++ code.
32+
const filterBaseLog = 11
33+
34+
type filterWriter struct {
35+
policy db.FilterPolicy
36+
// block holds the keys for the current block. The buffers are re-used for
37+
// each new block.
38+
block struct {
39+
data []byte
40+
lengths []int
41+
keys [][]byte
42+
}
43+
// data and offsets are the per-block filters for the overall table.
44+
data []byte
45+
offsets []uint32
46+
}
47+
48+
func (f *filterWriter) hasKeys() bool {
49+
return len(f.block.lengths) != 0
50+
}
51+
52+
func (f *filterWriter) appendKey(key []byte) {
53+
f.block.data = append(f.block.data, key...)
54+
f.block.lengths = append(f.block.lengths, len(key))
55+
}
56+
57+
func (f *filterWriter) appendOffset() error {
58+
o := len(f.data)
59+
if uint64(o) > 1<<32-1 {
60+
return errors.New("leveldb/table: filter data is too long")
61+
}
62+
f.offsets = append(f.offsets, uint32(o))
63+
return nil
64+
}
65+
66+
func (f *filterWriter) emit() error {
67+
if err := f.appendOffset(); err != nil {
68+
return err
69+
}
70+
if !f.hasKeys() {
71+
return nil
72+
}
73+
74+
i, j := 0, 0
75+
for _, length := range f.block.lengths {
76+
j += length
77+
f.block.keys = append(f.block.keys, f.block.data[i:j])
78+
i = j
79+
}
80+
f.data = append(f.data, f.policy.NewFilter(f.block.keys)...)
81+
82+
// Reset the per-block state.
83+
f.block.data = f.block.data[:0]
84+
f.block.lengths = f.block.lengths[:0]
85+
f.block.keys = f.block.keys[:0]
86+
return nil
87+
}
88+
89+
func (f *filterWriter) finishBlock(blockOffset uint64) error {
90+
for i := blockOffset >> filterBaseLog; i > uint64(len(f.offsets)); {
91+
if err := f.emit(); err != nil {
92+
return err
93+
}
94+
}
95+
return nil
96+
}
97+
98+
func (f *filterWriter) finish() ([]byte, error) {
99+
if f.hasKeys() {
100+
if err := f.emit(); err != nil {
101+
return nil, err
102+
}
103+
}
104+
if err := f.appendOffset(); err != nil {
105+
return nil, err
106+
}
107+
108+
var b [4]byte
109+
for _, x := range f.offsets {
110+
binary.LittleEndian.PutUint32(b[:], x)
111+
f.data = append(f.data, b[0], b[1], b[2], b[3])
112+
}
113+
f.data = append(f.data, filterBaseLog)
114+
return f.data, nil
115+
}
116+
26117
// Writer is a table writer. It implements the DB interface, as documented
27118
// in the leveldb/db package.
28119
type Writer struct {
@@ -65,6 +156,8 @@ type Writer struct {
65156
// re-used over the lifetime of the writer, avoiding the allocation of a
66157
// temporary buffer for each block.
67158
compressedBuf []byte
159+
// filter accumulates the filter block.
160+
filter filterWriter
68161
// tmp is a scratch buffer, large enough to hold either footerLen bytes,
69162
// blockTrailerLen bytes, or (5 * binary.MaxVarintLen64) bytes.
70163
tmp [50]byte
@@ -103,6 +196,9 @@ func (w *Writer) Set(key, value []byte, o *db.WriteOptions) error {
103196
w.err = fmt.Errorf("leveldb/table: Set called in non-increasing key order: %q, %q", w.prevKey, key)
104197
return w.err
105198
}
199+
if w.filter.policy != nil {
200+
w.filter.appendKey(key)
201+
}
106202
w.flushPendingBH(key)
107203
w.append(key, value, w.nEntries%w.blockRestartInterval == 0)
108204
// If the estimated block size is sufficiently large, finish the current block.
@@ -169,15 +265,32 @@ func (w *Writer) finishBlock() (blockHandle, error) {
169265
// Compress the buffer, discarding the result if the improvement
170266
// isn't at least 12.5%.
171267
b := w.buf.Bytes()
172-
w.tmp[0] = noCompressionBlockType
268+
blockType := byte(noCompressionBlockType)
173269
if w.compression == db.SnappyCompression {
174270
compressed := snappy.Encode(w.compressedBuf, b)
175271
w.compressedBuf = compressed[:cap(compressed)]
176272
if len(compressed) < len(b)-len(b)/8 {
177-
w.tmp[0] = snappyCompressionBlockType
273+
blockType = snappyCompressionBlockType
178274
b = compressed
179275
}
180276
}
277+
bh, err := w.writeRawBlock(b, blockType)
278+
279+
// Calculate filters.
280+
if w.filter.policy != nil {
281+
w.filter.finishBlock(w.offset)
282+
}
283+
284+
// Reset the per-block state.
285+
w.buf.Reset()
286+
w.nEntries = 0
287+
w.restarts = w.restarts[:0]
288+
289+
return bh, err
290+
}
291+
292+
func (w *Writer) writeRawBlock(b []byte, blockType byte) (blockHandle, error) {
293+
w.tmp[0] = blockType
181294

182295
// Calculate the checksum.
183296
checksum := crc.New(b).Update(w.tmp[:1]).Value()
@@ -192,11 +305,6 @@ func (w *Writer) finishBlock() (blockHandle, error) {
192305
}
193306
bh := blockHandle{w.offset, uint64(len(b))}
194307
w.offset += uint64(len(b)) + blockTrailerLen
195-
196-
// Reset the per-block state.
197-
w.buf.Reset()
198-
w.nEntries = 0
199-
w.restarts = w.restarts[:0]
200308
return bh, nil
201309
}
202310

@@ -229,16 +337,36 @@ func (w *Writer) Close() (err error) {
229337
w.flushPendingBH(nil)
230338
}
231339

232-
// Write the (empty) metaindex block.
340+
// Writer.append uses w.tmp[:3*binary.MaxVarintLen64]. Let tmp be the other
341+
// half of that slice.
342+
tmp := w.tmp[3*binary.MaxVarintLen64 : 5*binary.MaxVarintLen64]
343+
344+
// Write the filter block.
345+
if w.filter.policy != nil {
346+
b, err := w.filter.finish()
347+
if err != nil {
348+
w.err = err
349+
return w.err
350+
}
351+
bh, err := w.writeRawBlock(b, noCompressionBlockType)
352+
if err != nil {
353+
w.err = err
354+
return w.err
355+
}
356+
n := encodeBlockHandle(tmp, bh)
357+
w.append([]byte("filter."+w.filter.policy.Name()), tmp[:n], true)
358+
}
359+
360+
// Write the metaindex block. It might be an empty block, if the filter
361+
// policy is nil.
233362
metaindexBlockHandle, err := w.finishBlock()
234363
if err != nil {
235364
w.err = err
236365
return w.err
237366
}
238367

239368
// Write the index block.
240-
// writer.append uses w.tmp[:3*binary.MaxVarintLen64].
241-
i0, tmp := 0, w.tmp[3*binary.MaxVarintLen64:5*binary.MaxVarintLen64]
369+
i0 := 0
242370
for _, ie := range w.indexEntries {
243371
n := encodeBlockHandle(tmp, ie.bh)
244372
i1 := i0 + ie.keyLen
@@ -280,15 +408,17 @@ func (w *Writer) Close() (err error) {
280408
// NewWriter returns a new table writer for the file. Closing the writer will
281409
// close the file.
282410
func NewWriter(f db.File, o *db.Options) *Writer {
283-
// TODO: honor o.GetFilterPolicy().
284411
w := &Writer{
285412
closer: f,
286413
blockRestartInterval: o.GetBlockRestartInterval(),
287414
blockSize: o.GetBlockSize(),
288415
cmp: o.GetComparer(),
289416
compression: o.GetCompression(),
290-
prevKey: make([]byte, 0, 256),
291-
restarts: make([]uint32, 0, 256),
417+
filter: filterWriter{
418+
policy: o.GetFilterPolicy(),
419+
},
420+
prevKey: make([]byte, 0, 256),
421+
restarts: make([]uint32, 0, 256),
292422
}
293423
if f == nil {
294424
w.err = errors.New("leveldb/table: nil file")

0 commit comments

Comments
 (0)