Skip to content

Commit

Permalink
fix: Fix race condition in block.incRef (#1337)
Browse files Browse the repository at this point in the history
Fixes hypermodeinc/dgraph#5456 .

This PR fixes the crash that could occur when a block was
read from the cache.
There was a logical race condition. The following sequence
of events could occur which would cause the crash.
1. An iterator makes `t.Block(idx)` call
2. The `t.Block` function finds the block in the cache.
   The newly found block has `ref=1` which means it was
   held only by the cache.
3. The `t.Block` function is holding the block and at the
   same time the block gets evicted from the cache. The
   existing ref of the block was `1` so the cache eviction
   would decrement the ref and make it `0`. When the ref
   becomes `0`, the block is added to the `sync.Pool` and
   is ready to be reused.
4. While the block got evicted from the cache, the iterator
   received the block and it incremented the ref from
   `0` to `1` and starts using this.

Since the block was inserted into the syncPool in the 3rd
event, it could be modified by anyone while the iterator is
using it.
  • Loading branch information
Ibrahim Jarif authored May 22, 2020
1 parent 86b1db9 commit 21735af
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 13 deletions.
29 changes: 26 additions & 3 deletions badger/cmd/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func toSlice(bal uint64) []byte {
}

func getBalance(txn *badger.Txn, account int) (uint64, error) {
item, err := txn.Get(key(account))
item, err := get(txn, key(account))
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -197,14 +197,33 @@ func diff(a, b []account) string {

var errFailure = errors.New("test failed due to balance mismatch")

// get function will fetch the value for the key "k" either by using the
// txn.Get API or the iterator.Seek API.
func get(txn *badger.Txn, k []byte) (*badger.Item, error) {
if rand.Int()%2 == 0 {
return txn.Get(k)
}

iopt := badger.DefaultIteratorOptions
// PrefectValues is expensive. We don't need it here.
iopt.PrefetchValues = false
it := txn.NewIterator(iopt)
defer it.Close()
it.Seek(k)
if it.Valid() {
return it.Item(), nil
}
return nil, badger.ErrKeyNotFound
}

// seekTotal retrives the total of all accounts by seeking for each account key.
func seekTotal(txn *badger.Txn) ([]account, error) {
expected := uint64(numAccounts) * uint64(initialBal)
var accounts []account

var total uint64
for i := 0; i < numAccounts; i++ {
item, err := txn.Get(key(i))
item, err := get(txn, key(i))
if err != nil {
log.Printf("Error for account: %d. err=%v. key=%q\n", i, err, key(i))
return accounts, err
Expand Down Expand Up @@ -343,7 +362,11 @@ func runTest(cmd *cobra.Command, args []string) error {
WithNumMemtables(2).
// Do not GC any versions, because we need them for the disect..
WithNumVersionsToKeep(int(math.MaxInt32)).
WithValueThreshold(1) // Make all values go to value log
WithValueThreshold(1). // Make all values go to value log
WithCompression(options.ZSTD).
WithKeepL0InMemory(false).
WithMaxCacheSize(10 << 20)

if mmap {
opts = opts.WithTableLoadingMode(options.MemoryMap)
}
Expand Down
2 changes: 0 additions & 2 deletions table/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ func (itr *blockIterator) setBlock(b *block) {
// Decrement the ref for the old block. If the old block was compressed, we
// might be able to reuse it.
itr.block.decrRef()
// Increment the ref for the new block.
b.incrRef()

itr.block = b
itr.err = nil
Expand Down
52 changes: 44 additions & 8 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,25 +187,46 @@ type block struct {
ref int32
}

func (b *block) incrRef() {
atomic.AddInt32(&b.ref, 1)
// incrRef increments the ref of a block and return a bool indicating if the
// increment was successful. A true value indicates that the block can be used.
func (b *block) incrRef() bool {
for {
// We can't blindly add 1 to ref. We need to check whether it has
// reached zero first, because if it did, then we should absolutely not
// use this block.
ref := atomic.LoadInt32(&b.ref)
// The ref would not be equal to 0 unless the existing
// block get evicted before this line. If the ref is zero, it means that
// the block is already added the the blockPool and cannot be used
// anymore. The ref of a new block is 1 so the following condition will
// be true only if the block got reused before we could increment its
// ref.
if ref == 0 {
return false
}
// Increment the ref only if it is not zero and has not changed between
// the time we read it and we're updating it.
//
if atomic.CompareAndSwapInt32(&b.ref, ref, ref+1) {
return true
}
}
}
func (b *block) decrRef() {
if b == nil {
return
}

p := atomic.AddInt32(&b.ref, -1)
// Insert the []byte into pool only if the block is resuable. When a block
// is reusable a new []byte is used for decompression and this []byte can
// be reused.
// In case of an uncompressed block, the []byte is a reference to the
// table.mmap []byte slice. Any attempt to write data to the mmap []byte
// will lead to SEGFAULT.
if p == 0 && b.isReusable {
if atomic.AddInt32(&b.ref, -1) == 0 && b.isReusable {
blockPool.Put(&b.data)
}
y.AssertTrue(p >= 0)
y.AssertTrue(atomic.LoadInt32(&b.ref) >= 0)
}
func (b *block) size() int64 {
return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
Expand Down Expand Up @@ -419,6 +440,9 @@ func (t *Table) readIndex() error {
return nil
}

// block function return a new block. Each block holds a ref and the byte
// slice stored in the block will be reused when the ref becomes zero. The
// caller should release the block by calling block.decrRef() on it.
func (t *Table) block(idx int) (*block, error) {
y.AssertTruef(idx >= 0, "idx=%d", idx)
if idx >= len(t.blockIndex) {
Expand All @@ -428,12 +452,18 @@ func (t *Table) block(idx int) (*block, error) {
key := t.blockCacheKey(idx)
blk, ok := t.opt.Cache.Get(key)
if ok && blk != nil {
return blk.(*block), nil
// Use the block only if the increment was successful. The block
// could get evicted from the cache between the Get() call and the
// incrRef() call.
if b := blk.(*block); b.incrRef() {
return b, nil
}
}
}
ko := t.blockIndex[idx]
blk := &block{
offset: int(ko.Offset),
ref: 1,
}
var err error
if blk.data, err = t.read(blk.offset, int(ko.Len)); err != nil {
Expand Down Expand Up @@ -490,8 +520,14 @@ func (t *Table) block(idx int) (*block, error) {
}
if t.opt.Cache != nil {
key := t.blockCacheKey(idx)
blk.incrRef()
t.opt.Cache.Set(key, blk, blk.size())
// incrRef should never return false here because we're calling it on a
// new block with ref=1.
y.AssertTrue(blk.incrRef())

// Decrement the block ref if we could not insert it in the cache.
if !t.opt.Cache.Set(key, blk, blk.size()) {
blk.decrRef()
}
}
return blk, nil
}
Expand Down

0 comments on commit 21735af

Please sign in to comment.