Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic iterator #42

Merged
merged 1 commit into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions db/base_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"io"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
ldbiterator "github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
Expand Down Expand Up @@ -40,13 +40,13 @@ type BaseDB interface {
// until a final write is called.
NewBatch() Batch

// NewIterator creates a binary-alphabetical iterator over a subset
// newIterator creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
//
// Note: This method assumes that the prefix is NOT part of the start, so there's
// no need for the caller to prepend the prefix to the start
NewIterator(prefix []byte, start []byte) iterator.Iterator
NewIterator(prefix []byte, start []byte) ldbiterator.Iterator

// Stat returns a particular internal stat of the database.
Stat(property string) (string, error)
Expand Down Expand Up @@ -130,9 +130,9 @@ func (db *baseDB) NewBatch() Batch {
return newBatch(db.backend)
}

// NewIterator returns iterator which iterates over values depending on the prefix.
// newIterator returns iterator which iterates over values depending on the prefix.
// Note: If prefix is nil, everything is iterated.
func (db *baseDB) NewIterator(prefix []byte, start []byte) iterator.Iterator {
func (db *baseDB) NewIterator(prefix []byte, start []byte) ldbiterator.Iterator {
r := util.BytesPrefix(prefix)
r.Start = append(r.Start, start...)
return db.backend.NewIterator(r, db.ro)
Expand Down
84 changes: 84 additions & 0 deletions db/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package db

import (
"errors"
"sync"

ldbiterator "github.com/syndtr/goleveldb/leveldb/iterator"
)

// Iterator iterates over a database's key/value pairs in ascending key order.
//
// When it encounters an error any seek will return false and will yield no key/
// value pairs. The error can be queried by calling the Error method. Calling
// Release is still necessary.
//
// An iterator must be released after use, but it is not necessary to read an
// iterator until exhaustion. An iterator is not safe for concurrent use, but it
// is safe to use multiple iterators concurrently.
type Iterator[T comparable] interface {
// Next moves the iterator to the next key/value pair. It returns whether the
// iterator is exhausted.
Next() bool

// Error returns any accumulated error. Exhausting all the key/value pairs
// is not considered to be an error.
Error() error

// Start starts the iteration process.
start(numWorkers int)

// Value returns the current value of type T, or nil if done. The
// caller should not modify the contents of the returned slice, and its contents
// may change on the next call to Next.
Value() T

// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
Release()
}

type iterator[T comparable] struct {
err error
iter ldbiterator.Iterator
resultCh chan T
wg *sync.WaitGroup
cur T
}

func newIterator[T comparable](iter ldbiterator.Iterator) iterator[T] {
return iterator[T]{
iter: iter,
resultCh: make(chan T, 10),
wg: new(sync.WaitGroup),
}
}

// Next returns false if iterator is at its end. Otherwise, it returns true.
// Note: False does not stop the iterator. Release() should be called.
func (i *iterator[T]) Next() bool {
i.cur = <-i.resultCh
var zero T
return i.cur != zero
}

// Error returns iterators error if any.
func (i *iterator[T]) Error() error {
return errors.Join(i.err, i.iter.Error())
}

// Value returns current value hold by the iterator.
func (i *iterator[T]) Value() T {
return i.cur
}

// Release the iterator and wait until all threads are closed gracefully.
func (i *iterator[T]) Release() {
i.iter.Release()
i.wg.Wait()
}

func isNil[T comparable](arg T) bool {
var t T
return arg == t
}
86 changes: 3 additions & 83 deletions db/substate_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type SubstateDB interface {
// DeleteSubstate deletes Substate for given block and tx number.
DeleteSubstate(block uint64, tx int) error

NewSubstateIterator(start int, numWorkers int) SubstateIterator
NewSubstateIterator(start int, numWorkers int) Iterator[*Transaction]
}

// NewDefaultSubstateDB creates new instance of SubstateDB with default options.
Expand Down Expand Up @@ -114,92 +114,12 @@ func (db *substateDB) DeleteSubstate(block uint64, tx int) error {
}

// NewSubstateIterator returns iterator which iterates over Substates.
func (db *substateDB) NewSubstateIterator(start int, numWorkers int) SubstateIterator {
func (db *substateDB) NewSubstateIterator(start int, numWorkers int) Iterator[*Transaction] {
num := make([]byte, 4)
binary.BigEndian.PutUint32(num, uint32(start))
iter := newSubstateIterator(db, num)

// Create channels
errCh := make(chan error)
rawDataChs := make([]chan rawEntry, numWorkers)
resultChs := make([]chan *Transaction, numWorkers)

for i := 0; i < numWorkers; i++ {
rawDataChs[i] = make(chan rawEntry, 10)
resultChs[i] = make(chan *Transaction, 10)
}

// Start iter => raw data stage
iter.wg.Add(1)
go func() {
defer func() {
for _, c := range rawDataChs {
close(c)
}
iter.wg.Done()
}()
step := 0
for {
if !iter.iter.Next() {
return
}

key := make([]byte, len(iter.iter.Key()))
copy(key, iter.iter.Key())
value := make([]byte, len(iter.iter.Value()))
copy(value, iter.iter.Value())

res := rawEntry{key, value}

select {
case e := <-errCh:
iter.err = e
return
case rawDataChs[step] <- res: // fall-through
}
step = (step + 1) % numWorkers
}
}()

// Start raw data => parsed transaction stage (parallel)
for i := 0; i < numWorkers; i++ {
iter.wg.Add(1)
id := i

go func() {
defer func() {
close(resultChs[id])
iter.wg.Done()
}()
for raw := range rawDataChs[id] {
transaction, err := iter.toTransaction(raw)
if err != nil {
errCh <- err
return
}
resultChs[id] <- transaction
}
}()
}

// Start the go routine moving transactions from parsers to sink in order
iter.wg.Add(1)
go func() {
defer func() {
close(iter.resultCh)
iter.wg.Done()
}()
step := 0
for openProducers := numWorkers; openProducers > 0; {
next := <-resultChs[step%numWorkers]
if next != nil {
iter.resultCh <- next
} else {
openProducers--
}
step++
}
}()
iter.start(numWorkers)

return iter
}
Expand Down
Loading