Skip to content

Remove version db from merkle db #1534

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions x/merkledb/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ func newOnEvictCache[K comparable, V any](maxSize int, onEviction func(V) error)
}
}

// RemoveOldest returns and removes the oldest element from this cache.
func (c *onEvictCache[K, V]) removeOldest() (K, V, bool) {
k, v, exists := c.fifo.Oldest()
if exists {
c.fifo.Delete(k)
}
return k, v, exists
}

// Get an element from this cache.
func (c *onEvictCache[K, V]) Get(key K) (V, bool) {
c.lock.RLock()
Expand All @@ -44,14 +53,14 @@ func (c *onEvictCache[K, V]) Put(key K, value V) error {
c.fifo.Put(key, value) // Mark as MRU

if c.fifo.Len() > c.maxSize {
oldestKey, oldsetVal, _ := c.fifo.Oldest()
oldestKey, oldestVal, _ := c.fifo.Oldest()
c.fifo.Delete(oldestKey)
return c.onEviction(oldsetVal)
return c.onEviction(oldestVal)
}
return nil
}

// Removes all elements from the cache.
// Flush removes all elements from the cache.
// Returns the last non-nil error during [c.onEviction], if any.
// If [c.onEviction] errors, it will still be called for any
// subsequent elements and the cache will still be emptied.
Expand All @@ -65,8 +74,8 @@ func (c *onEvictCache[K, V]) Flush() error {
var errs wrappers.Errs
iter := c.fifo.NewIterator()
for iter.Next() {
val := iter.Value()
errs.Add(c.onEviction(val))
errs.Add(c.onEviction(iter.Value()))
}

return errs.Err
}
105 changes: 61 additions & 44 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/prefixdb"
"github.com/ava-labs/avalanchego/database/versiondb"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/trace"
"github.com/ava-labs/avalanchego/utils"
Expand All @@ -30,8 +29,8 @@ import (
)

const (
RootPath = EmptyPath

RootPath = EmptyPath
evictionBatchSize = 100
// TODO: name better
rebuildViewSizeFractionOfCacheSize = 50
minRebuildViewSizePerCommit = 1000
Expand Down Expand Up @@ -132,9 +131,8 @@ type merkleDB struct {
// Should be held before taking [db.lock]
commitLock sync.RWMutex

// versiondb that the other dbs are built on.
// Allows the changes made to the snapshot and [nodeDB] to be atomic.
nodeDB *versiondb.Database
nodeDB database.Database
batch database.Batch

// Stores data about the database's current state.
metadataDB database.Database
Expand Down Expand Up @@ -169,13 +167,15 @@ func newDatabase(
) (*merkleDB, error) {
trieDB := &merkleDB{
metrics: metrics,
nodeDB: versiondb.New(prefixdb.New(nodePrefix, db)),
nodeDB: prefixdb.New(nodePrefix, db),
metadataDB: prefixdb.New(metadataPrefix, db),
history: newTrieHistory(config.HistoryLength),
tracer: config.Tracer,
childViews: make([]*trieView, 0, defaultPreallocationSize),
}

trieDB.batch = trieDB.nodeDB.NewBatch()

// Note: trieDB.OnEviction is responsible for writing intermediary nodes to
// disk as they are evicted from the cache.
trieDB.nodeCache = newOnEvictCache[path](config.NodeCacheSize, trieDB.onEviction)
Expand Down Expand Up @@ -258,8 +258,7 @@ func (db *merkleDB) rebuild(ctx context.Context) error {
return err
}
currentViewSize++
}
if err := db.nodeDB.Delete(key); err != nil {
} else if err := db.nodeDB.Delete(key); err != nil {
return err
}
}
Expand Down Expand Up @@ -347,7 +346,7 @@ func (db *merkleDB) Close() error {
return err
}

if err := db.nodeDB.Commit(); err != nil {
if err := db.commit(); err != nil {
return err
}

Expand Down Expand Up @@ -742,29 +741,53 @@ func (db *merkleDB) NewIteratorWithStartAndPrefix(start, prefix []byte) database
// the movement of [node] from [db.nodeCache] to [db.nodeDB] is atomic.
// As soon as [db.nodeCache] no longer has [node], [db.nodeDB] does.
// Non-nil error is fatal -- causes [db] to close.
func (db *merkleDB) onEviction(node *node) error {
if node == nil || node.hasValue() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it makes sense to keep this check as an early return so eviction of non-intermediary nodes remains a no-op and doesn't cause a batch write to disk.

Not sure it really matters, but wanted to point this out.

// only persist intermediary nodes
return nil
func (db *merkleDB) onEviction(n *node) error {
batch := db.nodeDB.NewBatch()
if err := writeNodeToBatch(batch, n); err != nil {
return err
}

nodeBytes, err := node.marshal()
// Evict the oldest [evictionBatchSize] nodes from the cache
// and write them to disk. We write a batch of them, rather than
// just [n], so that we don't immediately evict and write another
// node, because each time this method is called we do a disk write.
var err error
for removedCount := 0; removedCount < evictionBatchSize; removedCount++ {
_, n, exists := db.nodeCache.removeOldest()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't love this solution to batching the evictions, but it works.

if !exists {
// The cache is empty.
break
}
// Note this must be = not := since we check
// [err] outside the loop.
if err = writeNodeToBatch(batch, n); err != nil {
break
}
}
if err == nil {
err = batch.Write()
}
if err != nil {
db.onEvictionErr.Set(err)
// Prevent reads/writes from/to [db.nodeDB] to avoid inconsistent state.
_ = db.nodeDB.Close()
// This is a fatal error.
go db.Close()
return err
}
return nil
}

if err := db.nodeDB.Put(node.key.Bytes(), nodeBytes); err != nil {
db.onEvictionErr.Set(err)
_ = db.nodeDB.Close()
go db.Close()
// Writes [n] to [batch] if it's an intermediary node.
func writeNodeToBatch(batch database.Batch, n *node) error {
if n == nil {
return nil
}

nodeBytes, err := n.marshal()
if err != nil {
return err
}
return nil

return batch.Put(n.key.Bytes(), nodeBytes)
}

// Put upserts the key/value pair into the db.
Expand Down Expand Up @@ -852,19 +875,12 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *trieView) e
return errNoNewRoot
}

// commit any outstanding cache evicted nodes.
// Note that we do this here because below we may Abort
// [db.nodeDB], which would cause us to lose these changes.
if err := db.nodeDB.Commit(); err != nil {
return err
}

_, nodesSpan := db.tracer.Start(ctx, "MerkleDB.commitChanges.writeNodes")
for key, nodeChange := range changes.nodes {
if nodeChange.after == nil {
db.metrics.IOKeyWrite()
if err := db.nodeDB.Delete(key.Bytes()); err != nil {
db.nodeDB.Abort()
if err := db.batch.Delete(key.Bytes()); err != nil {
db.batch.Reset()
nodesSpan.End()
return err
}
Expand All @@ -876,15 +892,8 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *trieView) e
// Otherwise, intermediary nodes are persisted on cache eviction or
// shutdown.
db.metrics.IOKeyWrite()
nodeBytes, err := nodeChange.after.marshal()
if err != nil {
db.nodeDB.Abort()
nodesSpan.End()
return err
}

if err := db.nodeDB.Put(key.Bytes(), nodeBytes); err != nil {
db.nodeDB.Abort()
if err := writeNodeToBatch(db.batch, nodeChange.after); err != nil {
db.batch.Reset()
nodesSpan.End()
return err
}
Expand All @@ -893,10 +902,10 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *trieView) e
nodesSpan.End()

_, commitSpan := db.tracer.Start(ctx, "MerkleDB.commitChanges.dbCommit")
err := db.nodeDB.Commit()
err := db.commit()
commitSpan.End()
if err != nil {
db.nodeDB.Abort()
db.batch.Reset()
return err
}

Expand Down Expand Up @@ -1115,11 +1124,19 @@ func (db *merkleDB) initializeRootIfNeeded() (ids.ID, error) {
if err != nil {
return ids.Empty, err
}
if err := db.nodeDB.Put(rootKey, rootBytes); err != nil {
if err := db.batch.Put(rootKey, rootBytes); err != nil {
return ids.Empty, err
}

return db.root.id, db.nodeDB.Commit()
return db.root.id, db.commit()
}

func (db *merkleDB) commit() error {
if err := db.batch.Write(); err != nil {
return err
}
db.batch.Reset()
return nil
}

// Returns a view of the trie as it was when it had root [rootID] for keys within range [start, end].
Expand Down