Skip to content

Commit

Permalink
Merge pull request #87 from hashicorp/dnephin/iter-clone-txn
Browse files Browse the repository at this point in the history
Make ResultIteration safe for use after mutation (option 2)
  • Loading branch information
dnephin authored Jan 28, 2021
2 parents e86e9bb + c26541a commit 1cc0bb5
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 13 deletions.
11 changes: 8 additions & 3 deletions filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ type FilterIterator struct {
iter ResultIterator
}

func NewFilterIterator(wrap ResultIterator, filter FilterFunc) *FilterIterator {
// NewFilterIterator wraps a ResultIterator. The filter function is applied
// to each value returned by a call to iter.Next.
//
// See the documentation for ResultIterator to understand the behaviour of the
// returned FilterIterator.
func NewFilterIterator(iter ResultIterator, filter FilterFunc) *FilterIterator {
return &FilterIterator{
filter: filter,
iter: wrap,
iter: iter,
}
}

// WatchCh returns the watch channel of the wrapped iterator.
func (f *FilterIterator) WatchCh() <-chan struct{} { return f.iter.WatchCh() }

// Next returns the next non-filtered result from the wrapped iterator
// Next returns the next non-filtered result from the wrapped iterator.
func (f *FilterIterator) Next() interface{} {
for {
if value := f.iter.Next(); value == nil || !f.filter(value) {
Expand Down
49 changes: 39 additions & 10 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ func (txn *Txn) TrackChanges() {
}
}

// readableIndex returns a transaction usable for reading the given
// index in a table. If a write transaction is in progress, we may need
// to use an existing modified txn.
// readableIndex returns a transaction usable for reading the given index in a
// table. If the transaction is a write transaction with modifications, a clone of the
// modified index will be returned.
func (txn *Txn) readableIndex(table, index string) *iradix.Txn {
// Look for existing transaction
if txn.write && txn.modified != nil {
key := tableIndex{table, index}
exist, ok := txn.modified[key]
if ok {
return exist
return exist.Clone()
}
}

Expand Down Expand Up @@ -663,15 +663,35 @@ func (txn *Txn) getIndexValue(table, index string, args ...interface{}) (*IndexS
return indexSchema, val, err
}

// ResultIterator is used to iterate over a list of results
// from a Get query on a table.
// ResultIterator is used to iterate over a list of results from a query on a table.
//
// When a ResultIterator is created from a write transaction, the results from
// Next will reflect a snapshot of the table at the time the ResultIterator is
// created.
// This means that calling Insert or Delete on a transaction while iterating is
// allowed, but the changes made by Insert or Delete will not be observed in the
// results returned from subsequent calls to Next. For example if an item is deleted
// from the index used by the iterator it will still be returned by Next. If an
// item is inserted into the index used by the iterator, it will not be returned
// by Next. However, an iterator created after a call to Insert or Delete will
// reflect the modifications.
//
// When a ResultIterator is created from a write transaction, and there are already
// modifications to the index used by the iterator, the modification cache of the
// index will be invalidated. This may result in some additional allocations if
// the same node in the index is modified again.
type ResultIterator interface {
WatchCh() <-chan struct{}
// Next returns the next result from the iterator. If there are no more results
// nil is returned.
Next() interface{}
}

// Get is used to construct a ResultIterator over all the
// rows that match the given constraints of an index.
// Get is used to construct a ResultIterator over all the rows that match the
// given constraints of an index.
//
// See the documentation for ResultIterator to understand the behaviour of the
// returned ResultIterator.
func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, error) {
indexIter, val, err := txn.getIndexIterator(table, index, args...)
if err != nil {
Expand All @@ -691,7 +711,10 @@ func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, e

// GetReverse is used to construct a Reverse ResultIterator over all the
// rows that match the given constraints of an index.
// The returned ResultIterator's Next() will return the next Previous value
// The returned ResultIterator's Next() will return the next Previous value.
//
// See the documentation for ResultIterator to understand the behaviour of the
// returned ResultIterator.
func (txn *Txn) GetReverse(table, index string, args ...interface{}) (ResultIterator, error) {
indexIter, val, err := txn.getIndexIteratorReverse(table, index, args...)
if err != nil {
Expand All @@ -715,6 +738,9 @@ func (txn *Txn) GetReverse(table, index string, args ...interface{}) (ResultIter
// range scans within an index. It is not possible to watch the resulting
// iterator since the radix tree doesn't efficiently allow watching on lower
// bound changes. The WatchCh returned will be nill and so will block forever.
//
// See the documentation for ResultIterator to understand the behaviour of the
// returned ResultIterator.
func (txn *Txn) LowerBound(table, index string, args ...interface{}) (ResultIterator, error) {
indexIter, val, err := txn.getIndexIterator(table, index, args...)
if err != nil {
Expand All @@ -738,6 +764,9 @@ func (txn *Txn) LowerBound(table, index string, args ...interface{}) (ResultIter
// resulting iterator since the radix tree doesn't efficiently allow watching
// on lower bound changes. The WatchCh returned will be nill and so will block
// forever.
//
// See the documentation for ResultIterator to understand the behaviour of the
// returned ResultIterator.
func (txn *Txn) ReverseLowerBound(table, index string, args ...interface{}) (ResultIterator, error) {
indexIter, val, err := txn.getIndexIteratorReverse(table, index, args...)
if err != nil {
Expand Down Expand Up @@ -850,7 +879,7 @@ func (txn *Txn) getIndexIterator(table, index string, args ...interface{}) (*ira
indexTxn := txn.readableIndex(table, indexSchema.Name)
indexRoot := indexTxn.Root()

// Get an interator over the index
// Get an iterator over the index
indexIter := indexRoot.Iterator()
return indexIter, val, nil
}
Expand Down
50 changes: 50 additions & 0 deletions txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2178,3 +2178,53 @@ func TestTxn_Changes(t *testing.T) {
})
}
}

func TestTxn_GetIterAndDelete(t *testing.T) {
schema := &DBSchema{
Tables: map[string]*TableSchema{
"main": {
Name: "main",
Indexes: map[string]*IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &StringFieldIndex{Field: "ID"},
},
"foo": {
Name: "foo",
Indexer: &StringFieldIndex{Field: "Foo"},
},
},
},
},
}
db, err := NewMemDB(schema)
assertNilError(t, err)

key := "aaaa"
txn := db.Txn(true)
assertNilError(t, txn.Insert("main", &TestObject{ID: "1", Foo: key}))
assertNilError(t, txn.Insert("main", &TestObject{ID: "123", Foo: key}))
assertNilError(t, txn.Insert("main", &TestObject{ID: "2", Foo: key}))
txn.Commit()

txn = db.Txn(true)
// Delete something
assertNilError(t, txn.Delete("main", &TestObject{ID: "123", Foo: key}))

iter, err := txn.Get("main", "foo", key)
assertNilError(t, err)

for obj := iter.Next(); obj != nil; obj = iter.Next() {
assertNilError(t, txn.Delete("main", obj))
}

txn.Commit()
}

func assertNilError(t *testing.T, err error) {
t.Helper()
if err != nil {
t.Fatalf("expected nil error, got %v", err)
}
}

0 comments on commit 1cc0bb5

Please sign in to comment.