Skip to content

Commit

Permalink
Buffer pool for decompression (#1308)
Browse files Browse the repository at this point in the history
This commit uses a sync pool to hold the decompression buffers. A buffer
is added to the pool only if it was used for decompression. We don't
want to put buffers that were not used for decompression because these
buffers are read from mmaped SST files and any changes to these buffers
would lead to a segfault.

Fixes #1239
  • Loading branch information
Ibrahim Jarif authored May 13, 2020
1 parent af22dfd commit aadda9a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 20 deletions.
3 changes: 3 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ func Open(opt Options) (db *DB, err error) {
MaxCost: int64(float64(opt.MaxCacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
OnEvict: func(_, _ uint64, value interface{}, _ int64) {
table.BlockEvictHandler(value)
},
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ func NewTableBuilder(opts Options) *Builder {
return b
}

var slicePool = sync.Pool{
var blockPool = &sync.Pool{
New: func() interface{} {
// Make 4 KB blocks for reuse.
b := make([]byte, 0, 4<<10)
// Create 5 Kb blocks even when the default size of blocks is 4 KB. The
// ZSTD decompresion library increases the buffer by 2X if it's not big
// enough. Using a 5 KB block instead of a 4 KB one avoids the
// unncessary 2X allocation by the decompression library.
b := make([]byte, 5<<10)
return &b
},
}
Expand All @@ -135,9 +138,7 @@ func (b *Builder) handleBlock() {
// Compress the block.
if b.opt.Compression != options.None {
var err error

dst = slicePool.Get().(*[]byte)
*dst = (*dst)[:0]
dst = blockPool.Get().(*[]byte)

blockBuf, err = b.compressData(*dst, blockBuf)
y.Check(err)
Expand Down Expand Up @@ -167,7 +168,7 @@ func (b *Builder) handleBlock() {
item.end = item.start + uint32(len(blockBuf))

if dst != nil {
slicePool.Put(dst)
blockPool.Put(dst)
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion table/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,21 @@ type blockIterator struct {
key []byte
val []byte
entryOffsets []uint32
block *block

// prevOverlap stores the overlap of the previous key with the base key.
// This avoids unnecessary copy of base key when the overlap is same for multiple keys.
prevOverlap uint16
}

func (itr *blockIterator) setBlock(b *block) {
// Decrement the ref for the old block. If the old block was compressed, we
// might be able to reuse it.
itr.block.decrRef()
// Increment the ref for the new block.
b.incrRef()

itr.block = b
itr.err = nil
itr.idx = 0
itr.baseKey = itr.baseKey[:0]
Expand Down Expand Up @@ -102,7 +110,9 @@ func (itr *blockIterator) Error() error {
return itr.err
}

func (itr *blockIterator) Close() {}
func (itr *blockIterator) Close() {
itr.block.decrRef()
}

var (
origin = 0
Expand Down Expand Up @@ -172,6 +182,7 @@ func (t *Table) NewIterator(reversed bool) *Iterator {

// Close closes the iterator (and it must be called).
func (itr *Iterator) Close() error {
itr.bi.Close()
return itr.t.DecrRef()
}

Expand Down
67 changes: 55 additions & 12 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,44 @@ func (t *Table) DecrRef() error {
return nil
}

// BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction.
func BlockEvictHandler(value interface{}) {
if b, ok := value.(*block); ok {
b.decrRef()
}
}

type block struct {
offset int
data []byte
checksum []byte
entriesIndexStart int // start index of entryOffsets list
entryOffsets []uint32
chkLen int // checksum length
entriesIndexStart int // start index of entryOffsets list
entryOffsets []uint32 // used to binary search an entry in the block.
chkLen int // checksum length.
isReusable bool // used to determine if the blocked should be reused.
ref int32
}

func (b *block) incrRef() {
atomic.AddInt32(&b.ref, 1)
}
func (b *block) decrRef() {
if b == nil {
return
}

p := atomic.AddInt32(&b.ref, -1)
// Insert the []byte into pool only if the block is resuable. When a block
// is reusable a new []byte is used for decompression and this []byte can
// be reused.
// In case of an uncompressed block, the []byte is a reference to the
// table.mmap []byte slice. Any attempt to write data to the mmap []byte
// will lead to SEGFAULT.
if p == 0 && b.isReusable {
blockPool.Put(&b.data)
}
y.AssertTrue(p >= 0)
}
func (b *block) size() int64 {
return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4)
Expand Down Expand Up @@ -419,8 +448,7 @@ func (t *Table) block(idx int) (*block, error) {
}
}

blk.data, err = t.decompressData(blk.data)
if err != nil {
if err = t.decompress(blk); err != nil {
return nil, errors.Wrapf(err,
"failed to decode compressed data in file: %s at offset: %d, len: %d",
t.fd.Name(), blk.offset, ko.Len)
Expand Down Expand Up @@ -462,6 +490,7 @@ func (t *Table) block(idx int) (*block, error) {
}
if t.opt.Cache != nil {
key := t.blockCacheKey(idx)
blk.incrRef()
t.opt.Cache.Set(key, blk, blk.size())
}
return blk, nil
Expand Down Expand Up @@ -563,7 +592,8 @@ func (t *Table) VerifyChecksum() error {
return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d",
t.Filename(), i, os.Offset)
}

b.incrRef()
defer b.decrRef()
// OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum
// on block, verification would be done while reading block itself.
if !(t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead) {
Expand Down Expand Up @@ -629,15 +659,28 @@ func NewFilename(id uint64, dir string) string {
return filepath.Join(dir, IDToFilename(id))
}

// decompressData decompresses the given data.
func (t *Table) decompressData(data []byte) ([]byte, error) {
// decompress decompresses the data stored in a block.
func (t *Table) decompress(b *block) error {
var err error
switch t.opt.Compression {
case options.None:
return data, nil
// Nothing to be done here.
case options.Snappy:
return snappy.Decode(nil, data)
dst := blockPool.Get().(*[]byte)
b.data, err = snappy.Decode(*dst, b.data)
if err != nil {
return errors.Wrap(err, "failed to decompress")
}
b.isReusable = true
case options.ZSTD:
return y.ZSTDDecompress(nil, data)
dst := blockPool.Get().(*[]byte)
b.data, err = y.ZSTDDecompress(*dst, b.data)
if err != nil {
return errors.Wrap(err, "failed to decompress")
}
b.isReusable = true
default:
return errors.New("Unsupported compression type")
}
return nil, errors.New("Unsupported compression type")
return nil
}

0 comments on commit aadda9a

Please sign in to comment.