Skip to content

Commit

Permalink
Support multiple iterators in read-write transactions. (#1286)
Browse files Browse the repository at this point in the history
This adds support for multiple iterators during a read-write transaction. The
iterators created in a read-write transaction will only be able to see writes
that were performed before the iterator was created. Any writes that occur
after the iterator is created will be invisible to the iterator.

Fixes #981
  • Loading branch information
elliotcourant authored May 13, 2020
1 parent 9459a24 commit af22dfd
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 20 deletions.
44 changes: 33 additions & 11 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,24 +812,46 @@ func TestIterateParallel(t *testing.T) {

wg.Wait()

// Check that a RW txn can't run multiple iterators.
// Check that a RW txn can run multiple iterators.
txn := db.NewTransaction(true)
itr := txn.NewIterator(DefaultIteratorOptions)
require.Panics(t, func() {
txn.NewIterator(DefaultIteratorOptions)
require.NotPanics(t, func() {
// Now that multiple iterators are supported in read-write
// transactions, make sure this does not panic anymore. Then just
// close the iterator.
txn.NewIterator(DefaultIteratorOptions).Close()
})
// The transaction should still panic since there is still one pending
// iterator that is open.
require.Panics(t, txn.Discard)
itr.Close()
txn.Discard()

// Run multiple iterators for a RO txn.
txn = db.NewTransaction(false)
defer txn.Discard()
wg.Add(3)
go iterate(txn, &wg)
go iterate(txn, &wg)
go iterate(txn, &wg)
wg.Wait()
// (Regression) Make sure that creating multiple concurrent iterators
// within a read only transaction continues to work.
t.Run("multiple read-only iterators", func(t *testing.T) {
// Run multiple iterators for a RO txn.
txn = db.NewTransaction(false)
defer txn.Discard()
wg.Add(3)
go iterate(txn, &wg)
go iterate(txn, &wg)
go iterate(txn, &wg)
wg.Wait()
})

// Make sure that when we create multiple concurrent iterators within a
// read-write transaction that it actually iterates successfully.
t.Run("multiple read-write iterators", func(t *testing.T) {
// Run multiple iterators for a RO txn.
txn = db.NewTransaction(true)
defer txn.Discard()
wg.Add(3)
go iterate(txn, &wg)
go iterate(txn, &wg)
go iterate(txn, &wg)
wg.Wait()
})
})
}

Expand Down
13 changes: 7 additions & 6 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,16 +446,17 @@ type Iterator struct {
//
// Multiple Iterators:
// For a read-only txn, multiple iterators can be running simultaneously. However, for a read-write
// txn, only one can be running at one time to avoid race conditions, because Txn is thread-unsafe.
// txn, iterators have the nuance of being a snapshot of the writes for the transaction at the time
// iterator was created. If writes are performed after an iterator is created, then that iterator
// will not be able to see those writes. Only writes performed before an iterator was created can be
// viewed.
func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
if txn.discarded {
panic("Transaction has already been discarded")
}
// Do not change the order of the next if. We must track the number of running iterators.
if atomic.AddInt32(&txn.numIterators, 1) > 1 && txn.update {
atomic.AddInt32(&txn.numIterators, -1)
panic("Only one iterator can be active at one time, for a RW txn.")
}

// Keep track of the number of active iterators.
atomic.AddInt32(&txn.numIterators, 1)

// TODO: If Prefix is set, only pick those memtables which have keys with
// the prefix.
Expand Down
14 changes: 11 additions & 3 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ type Txn struct {
readTs uint64
commitTs uint64

update bool // update is used to conditionally keep track of reads.
reads []uint64 // contains fingerprints of keys read.
writes []uint64 // contains fingerprints of keys written.
update bool // update is used to conditionally keep track of reads.
readsLock sync.Mutex // guards the reads slice. See addReadKey.
reads []uint64 // contains fingerprints of keys read.
writes []uint64 // contains fingerprints of keys written.

pendingWrites map[string]*Entry // cache stores any writes done by txn.
duplicateWrites []*Entry // Used in managed mode to store duplicate entries.
Expand Down Expand Up @@ -438,7 +439,14 @@ func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
func (txn *Txn) addReadKey(key []byte) {
if txn.update {
fp := z.MemHash(key)

// Because of the possibility of multiple iterators it is now possible
// for multiple threads within a read-write transaction to read keys at
// the same time. The reads slice is not currently thread-safe and
// needs to be locked whenever we mark a key as read.
txn.readsLock.Lock()
txn.reads = append(txn.reads, fp)
txn.readsLock.Unlock()
}
}

Expand Down

0 comments on commit af22dfd

Please sign in to comment.