Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions x/merkledb/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ func newOnEvictCache[K comparable, V any](maxSize int, onEviction func(V) error)
}
}

// removeOldest returns and removes the oldest element from this cache.
func (c *onEvictCache[K, V]) removeOldest() (K, V, bool) {
k, v, exists := c.fifo.Oldest()
if exists {
c.fifo.Delete(k)
}
return k, v, exists
}

// Get an element from this cache.
func (c *onEvictCache[K, V]) Get(key K) (V, bool) {
c.lock.RLock()
Expand All @@ -44,14 +53,14 @@ func (c *onEvictCache[K, V]) Put(key K, value V) error {
c.fifo.Put(key, value) // Mark as MRU

if c.fifo.Len() > c.maxSize {
oldestKey, oldsetVal, _ := c.fifo.Oldest()
oldestKey, oldestVal, _ := c.fifo.Oldest()
c.fifo.Delete(oldestKey)
return c.onEviction(oldsetVal)
return c.onEviction(oldestVal)
}
return nil
}

// Removes all elements from the cache.
// Flush removes all elements from the cache.
// Returns the last non-nil error during [c.onEviction], if any.
// If [c.onEviction] errors, it will still be called for any
// subsequent elements and the cache will still be emptied.
Expand All @@ -65,8 +74,8 @@ func (c *onEvictCache[K, V]) Flush() error {
var errs wrappers.Errs
iter := c.fifo.NewIterator()
for iter.Next() {
val := iter.Value()
errs.Add(c.onEviction(val))
errs.Add(c.onEviction(iter.Value()))
}

return errs.Err
}
98 changes: 53 additions & 45 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/prefixdb"
"github.com/ava-labs/avalanchego/database/versiondb"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/trace"
"github.com/ava-labs/avalanchego/utils"
Expand All @@ -29,8 +28,8 @@ import (
)

const (
RootPath = EmptyPath

RootPath = EmptyPath
evictionBatchSize = 100
// TODO: name better
rebuildViewSizeFractionOfCacheSize = 50
minRebuildViewSizePerCommit = 1000
Expand Down Expand Up @@ -139,9 +138,7 @@ type merkleDB struct {
// Should be held before taking [db.lock]
commitLock sync.RWMutex

// versiondb that the other dbs are built on.
// Allows the changes made to the snapshot and [nodeDB] to be atomic.
nodeDB *versiondb.Database
nodeDB database.Database

// Stores data about the database's current state.
metadataDB database.Database
Expand Down Expand Up @@ -176,7 +173,7 @@ func newDatabase(
) (*merkleDB, error) {
trieDB := &merkleDB{
metrics: metrics,
nodeDB: versiondb.New(prefixdb.New(nodePrefix, db)),
nodeDB: prefixdb.New(nodePrefix, db),
metadataDB: prefixdb.New(metadataPrefix, db),
history: newTrieHistory(config.HistoryLength),
tracer: config.Tracer,
Expand Down Expand Up @@ -265,8 +262,7 @@ func (db *merkleDB) rebuild(ctx context.Context) error {
return err
}
currentViewSize++
}
if err := db.nodeDB.Delete(key); err != nil {
} else if err := db.nodeDB.Delete(key); err != nil {
return err
}
}
Expand Down Expand Up @@ -354,10 +350,6 @@ func (db *merkleDB) Close() error {
return err
}

if err := db.nodeDB.Commit(); err != nil {
return err
}

// Successfully wrote intermediate nodes.
return db.metadataDB.Put(cleanShutdownKey, hadCleanShutdown)
}
Expand Down Expand Up @@ -749,23 +741,42 @@ func (db *merkleDB) NewIteratorWithStartAndPrefix(start, prefix []byte) database
// the movement of [node] from [db.nodeCache] to [db.nodeDB] is atomic.
// As soon as [db.nodeCache] no longer has [node], [db.nodeDB] does.
// Non-nil error is fatal -- causes [db] to close.
func (db *merkleDB) onEviction(node *node) error {
if node == nil || node.hasValue() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it makes sense to keep this check as an early return so eviction of non-intermediary nodes remains a no-op and doesn't cause a batch write to disk.

Not sure it really matters, but wanted to point this out.

// only persist intermediary nodes
func (db *merkleDB) onEviction(n *node) error {
// the evicted node isn't an intermediary node, so skip writing.
if n == nil || n.hasValue() {
return nil
}

nodeBytes, err := node.marshal()
if err != nil {
db.onEvictionErr.Set(err)
// Prevent reads/writes from/to [db.nodeDB] to avoid inconsistent state.
_ = db.nodeDB.Close()
// This is a fatal error.
go db.Close()
batch := db.nodeDB.NewBatch()
if err := writeNodeToBatch(batch, n); err != nil {
return err
}

if err := db.nodeDB.Put(node.key.Bytes(), nodeBytes); err != nil {
// Evict the oldest [evictionBatchSize] nodes from the cache
// and write them to disk. We write a batch of them, rather than
// just [n], so that we don't immediately evict and write another
// node, because each time this method is called we do a disk write.
var err error
for removedCount := 0; removedCount < evictionBatchSize; removedCount++ {
_, n, exists := db.nodeCache.removeOldest()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't love this solution to batching the evictions, but it works.

if !exists {
// The cache is empty.
break
}
if n == nil || n.hasValue() {
// only persist intermediary nodes
continue
}
// Note this must be = not := since we check
// [err] outside the loop.
if err = writeNodeToBatch(batch, n); err != nil {
break
}
}
if err == nil {
err = batch.Write()
}
if err != nil {
db.onEvictionErr.Set(err)
_ = db.nodeDB.Close()
go db.Close()
Expand All @@ -774,6 +785,16 @@ func (db *merkleDB) onEviction(node *node) error {
return nil
}

// Writes [n] to [batch]. Assumes [n] is non-nil.
func writeNodeToBatch(batch database.Batch, n *node) error {
nodeBytes, err := n.marshal()
if err != nil {
return err
}

return batch.Put(n.key.Bytes(), nodeBytes)
}

// Put upserts the key/value pair into the db.
func (db *merkleDB) Put(k, v []byte) error {
return db.Insert(context.Background(), k, v)
Expand Down Expand Up @@ -859,19 +880,13 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *trieView) e
return errNoNewRoot
}

// commit any outstanding cache evicted nodes.
// Note that we do this here because below we may Abort
// [db.nodeDB], which would cause us to lose these changes.
if err := db.nodeDB.Commit(); err != nil {
return err
}
batch := db.nodeDB.NewBatch()

_, nodesSpan := db.tracer.Start(ctx, "MerkleDB.commitChanges.writeNodes")
for key, nodeChange := range changes.nodes {
if nodeChange.after == nil {
db.metrics.IOKeyWrite()
if err := db.nodeDB.Delete(key.Bytes()); err != nil {
db.nodeDB.Abort()
if err := batch.Delete(key.Bytes()); err != nil {
nodesSpan.End()
return err
}
Expand All @@ -883,15 +898,7 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *trieView) e
// Otherwise, intermediary nodes are persisted on cache eviction or
// shutdown.
db.metrics.IOKeyWrite()
nodeBytes, err := nodeChange.after.marshal()
if err != nil {
db.nodeDB.Abort()
nodesSpan.End()
return err
}

if err := db.nodeDB.Put(key.Bytes(), nodeBytes); err != nil {
db.nodeDB.Abort()
if err := writeNodeToBatch(batch, nodeChange.after); err != nil {
nodesSpan.End()
return err
}
Expand All @@ -900,10 +907,9 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *trieView) e
nodesSpan.End()

_, commitSpan := db.tracer.Start(ctx, "MerkleDB.commitChanges.dbCommit")
err := db.nodeDB.Commit()
err := batch.Write()
commitSpan.End()
if err != nil {
db.nodeDB.Abort()
return err
}

Expand Down Expand Up @@ -1122,11 +1128,13 @@ func (db *merkleDB) initializeRootIfNeeded() (ids.ID, error) {
if err != nil {
return ids.Empty, err
}
if err := db.nodeDB.Put(rootKey, rootBytes); err != nil {

batch := db.nodeDB.NewBatch()
if err := batch.Put(rootKey, rootBytes); err != nil {
return ids.Empty, err
}

return db.root.id, db.nodeDB.Commit()
return db.root.id, batch.Write()
}

// Returns a view of the trie as it was when it had root [rootID] for keys within range [start, end].
Expand Down