Skip to content

change synchronisation in hashdb commit to allow concurrent reads #465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions triedb/hashdb/cleaner_arbitrum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package hashdb

import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
)

type delayedCleaner struct {
db *Database
cleaner *cleaner

hashes []common.Hash // to-be-deleted hashes
}

func newDelayedCleaner(db *Database) *delayedCleaner {
return &delayedCleaner{
db: db,
cleaner: &cleaner{db: db},
}
}

// Put can be called holding read lock
// Write lock is not required here as Put doesn't modify dirties
// The value put is ignored as it can be read later on from dirties
func (c *delayedCleaner) Put(key []byte, _ []byte) error {
hash := common.BytesToHash(key)
// If the node does not exist, we're done on this path
_, ok := c.db.dirties[hash]
if !ok {
return nil
}
// add key to to-be-deleted keys
c.hashes = append(c.hashes, hash)
return nil
}

func (c *delayedCleaner) Delete(key []byte) error {
panic("not implemented")
}

// Clean removes the buffered to-be-deleted keys from dirties
// Write lock must be held by the caller
func (c *delayedCleaner) Clean() {
for _, hash := range c.hashes {
node, ok := c.db.dirties[hash]
if !ok {
// node no longer in dirties
continue
}
rawdb.WriteLegacyTrieNode(c.cleaner, hash, node.node)
}
}
30 changes: 24 additions & 6 deletions triedb/hashdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,15 @@ func (db *Database) Commit(node common.Hash, report bool) error {
// There's no data to commit in this node
return nil
}
db.lock.Lock()
defer db.lock.Unlock()
// acquire read lock only, as commit doesn't modify dirties
// that allows concurrent node reads e.g. performed by rpc handlers
db.lock.RLock()
rLocked := true
defer func() {
if rLocked {
db.lock.RUnlock()
}
}()

// Create a database batch to flush persistent data out. It is important that
// outside code doesn't see an inconsistent state (referenced data removed from
Expand All @@ -418,9 +425,7 @@ func (db *Database) Commit(node common.Hash, report bool) error {
batch := db.diskdb.NewBatch()

// Move the trie itself into the batch, flushing if enough data is accumulated
nodes, storage := len(db.dirties), db.dirtiesSize

uncacher := &cleaner{db}
uncacher := newDelayedCleaner(db)
if err := db.commit(node, batch, uncacher); err != nil {
log.Error("Failed to commit trie from trie database", "err", err)
return err
Expand All @@ -436,6 +441,18 @@ func (db *Database) Commit(node common.Hash, report bool) error {
}
batch.Reset()

// prevent RUnlock in defer and RUnlock the lock
rLocked = false
db.lock.RUnlock()

// acquire write lock for the uncacher to remove committed nodes from dirties
db.lock.Lock()
defer db.lock.Unlock()

nodes, storage := len(db.dirties), db.dirtiesSize

uncacher.Clean()

// Reset the storage counters and bumped metrics
memcacheCommitTimeTimer.Update(time.Since(start))
memcacheCommitBytesMeter.Mark(int64(storage - db.dirtiesSize))
Expand All @@ -456,7 +473,8 @@ func (db *Database) Commit(node common.Hash, report bool) error {
}

// commit is the private locked version of Commit.
func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner) error {
// note: commit must not modify dirties as it should only require read lock held
func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *delayedCleaner) error {
// If the node does not exist, it's a previously committed node
node, ok := db.dirties[hash]
if !ok {
Expand Down
Loading