Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,19 @@ type Trie interface {
// be returned.
GetAccount(address common.Address) (*types.StateAccount, error)

// PrefetchAccount attempts to resolve specific accounts from the database
// to accelerate subsequent trie operations.
PrefetchAccount([]common.Address) error

// GetStorage returns the value for key stored in the trie. The value bytes
// must not be modified by the caller. If a node was not found in the database,
// a trie.MissingNodeError is returned.
GetStorage(addr common.Address, key []byte) ([]byte, error)

// PrefetchStorage attempts to resolve specific storage slots from the database
// to accelerate subsequent trie operations.
PrefetchStorage(addr common.Address, keys [][]byte) error

// UpdateAccount abstracts an account write to the trie. It encodes the
// provided account object with associated algorithm and then updates it
// in the trie with provided address.
Expand Down
34 changes: 18 additions & 16 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,10 @@ func (sf *subfetcher) loop() {
sf.tasks = nil
sf.lock.Unlock()

var (
addresses []common.Address
slots [][]byte
)
for _, task := range tasks {
if task.addr != nil {
key := *task.addr
Expand All @@ -400,6 +404,7 @@ func (sf *subfetcher) loop() {
sf.dupsCross++
continue
}
sf.seenReadAddr[key] = struct{}{}
} else {
if _, ok := sf.seenReadAddr[key]; ok {
sf.dupsCross++
Expand All @@ -409,7 +414,9 @@ func (sf *subfetcher) loop() {
sf.dupsWrite++
continue
}
sf.seenWriteAddr[key] = struct{}{}
}
addresses = append(addresses, *task.addr)
} else {
key := *task.slot
if task.read {
Expand All @@ -421,6 +428,7 @@ func (sf *subfetcher) loop() {
sf.dupsCross++
continue
}
sf.seenReadSlot[key] = struct{}{}
} else {
if _, ok := sf.seenReadSlot[key]; ok {
sf.dupsCross++
Expand All @@ -430,25 +438,19 @@ func (sf *subfetcher) loop() {
sf.dupsWrite++
continue
}
sf.seenWriteSlot[key] = struct{}{}
}
slots = append(slots, key.Bytes())
}
if task.addr != nil {
sf.trie.GetAccount(*task.addr)
} else {
sf.trie.GetStorage(sf.addr, (*task.slot)[:])
}
if len(addresses) != 0 {
if err := sf.trie.PrefetchAccount(addresses); err != nil {
log.Error("Failed to prefetch accounts", "err", err)
}
if task.read {
if task.addr != nil {
sf.seenReadAddr[*task.addr] = struct{}{}
} else {
sf.seenReadSlot[*task.slot] = struct{}{}
}
} else {
if task.addr != nil {
sf.seenWriteAddr[*task.addr] = struct{}{}
} else {
sf.seenWriteSlot[*task.slot] = struct{}{}
}
}
if len(slots) != 0 {
if err := sf.trie.PrefetchStorage(sf.addr, slots); err != nil {
log.Error("Failed to prefetch storage", "err", err)
}
}

Expand Down
6 changes: 0 additions & 6 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,6 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
fails.Add(1)
return nil // Ugh, something went horribly wrong, bail out
}
// Pre-load trie nodes for the intermediate root.
//
// This operation incurs significant memory allocations due to
// trie hashing and node decoding. TODO(rjl493456442): investigate
// ways to mitigate this overhead.
stateCpy.IntermediateRoot(true)
return nil
})
}
Expand Down
46 changes: 33 additions & 13 deletions trie/secure_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,6 @@ func (t *StateTrie) MustGet(key []byte) []byte {
return t.trie.MustGet(crypto.Keccak256(key))
}

// GetStorage attempts to retrieve a storage slot with provided account address
// and slot key. The value bytes must not be modified by the caller.
// If the specified storage slot is not in the trie, nil will be returned.
// If a trie node is not found in the database, a MissingNodeError is returned.
func (t *StateTrie) GetStorage(_ common.Address, key []byte) ([]byte, error) {
enc, err := t.trie.Get(crypto.Keccak256(key))
if err != nil || len(enc) == 0 {
return nil, err
}
_, content, _, err := rlp.Split(enc)
return content, err
}

// GetAccount attempts to retrieve an account with provided account address.
// If the specified account is not in the trie, nil will be returned.
// If a trie node is not found in the database, a MissingNodeError is returned.
Expand All @@ -144,6 +131,39 @@ func (t *StateTrie) GetAccountByHash(addrHash common.Hash) (*types.StateAccount,
return ret, err
}

// PrefetchAccount attempts to resolve specific accounts from the database
// to accelerate subsequent trie operations.
func (t *StateTrie) PrefetchAccount(addresses []common.Address) error {
var keys [][]byte
for _, addr := range addresses {
keys = append(keys, crypto.Keccak256(addr.Bytes()))
}
return t.trie.Prefetch(keys)
}

// GetStorage attempts to retrieve a storage slot with provided account address
// and slot key. The value bytes must not be modified by the caller.
// If the specified storage slot is not in the trie, nil will be returned.
// If a trie node is not found in the database, a MissingNodeError is returned.
func (t *StateTrie) GetStorage(_ common.Address, key []byte) ([]byte, error) {
enc, err := t.trie.Get(crypto.Keccak256(key))
if err != nil || len(enc) == 0 {
return nil, err
}
_, content, _, err := rlp.Split(enc)
return content, err
}

// PrefetchStorage attempts to resolve specific storage slots from the database
// to accelerate subsequent trie operations.
func (t *StateTrie) PrefetchStorage(_ common.Address, keys [][]byte) error {
var keylist [][]byte
for _, key := range keys {
keylist = append(keylist, crypto.Keccak256(key))
}
return t.trie.Prefetch(keylist)
}

// GetNode attempts to retrieve a trie node by compact-encoded path. It is not
// possible to use keybyte-encoding as the path might contain odd nibbles.
// If the specified trie node is not in the trie, nil will be returned.
Expand Down
20 changes: 20 additions & 0 deletions trie/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package trie
import (
"maps"
"slices"
"sync"
)

// opTracer tracks the changes of trie nodes. During the trie operations,
Expand Down Expand Up @@ -102,6 +103,7 @@ func (t *opTracer) deletedList() [][]byte {
// handling the concurrency issues by themselves.
type prevalueTracer struct {
data map[string][]byte
lock sync.RWMutex
}

// newPrevalueTracer initializes the tracer for capturing resolved trie nodes.
Expand All @@ -115,18 +117,27 @@ func newPrevalueTracer() *prevalueTracer {
// blob internally. Do not modify the value outside this function,
// as it is not deep-copied.
func (t *prevalueTracer) put(path []byte, val []byte) {
t.lock.Lock()
defer t.lock.Unlock()

t.data[string(path)] = val
}

// get returns the cached trie node value. If the node is not found, nil will
// be returned.
func (t *prevalueTracer) get(path []byte) []byte {
t.lock.RLock()
defer t.lock.RUnlock()

return t.data[string(path)]
}

// hasList returns a list of flags indicating whether the corresponding trie nodes
// specified by the path exist in the trie.
func (t *prevalueTracer) hasList(list [][]byte) []bool {
t.lock.RLock()
defer t.lock.RUnlock()

exists := make([]bool, 0, len(list))
for _, path := range list {
_, ok := t.data[string(path)]
Expand All @@ -137,16 +148,25 @@ func (t *prevalueTracer) hasList(list [][]byte) []bool {

// values returns a list of values of the cached trie nodes.
func (t *prevalueTracer) values() [][]byte {
t.lock.RLock()
defer t.lock.RUnlock()

return slices.Collect(maps.Values(t.data))
}

// reset resets the cached content in the prevalueTracer.
func (t *prevalueTracer) reset() {
t.lock.Lock()
defer t.lock.Unlock()

clear(t.data)
}

// copy returns a copied prevalueTracer instance.
func (t *prevalueTracer) copy() *prevalueTracer {
t.lock.RLock()
defer t.lock.RUnlock()

// Shadow clone is used, as the cached trie node values are immutable
return &prevalueTracer{
data: maps.Clone(t.data),
Expand Down
1 change: 0 additions & 1 deletion trie/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func testTrieOpTracer(t *testing.T, vals []struct{ k, v string }) {
}
insertSet := copySet(trie.opTracer.inserts) // copy before commit
deleteSet := copySet(trie.opTracer.deletes) // copy before commit

root, nodes := trie.Commit(false)
db.Update(root, types.EmptyRootHash, trienode.NewWithNodeSet(nodes))

Expand Down
24 changes: 23 additions & 1 deletion trie/transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ func (t *TransitionTrie) GetStorage(addr common.Address, key []byte) ([]byte, er
return t.base.GetStorage(addr, key)
}

// PrefetchStorage attempts to resolve specific storage slots from the database
// to accelerate subsequent trie operations.
func (t *TransitionTrie) PrefetchStorage(addr common.Address, keys [][]byte) error {
for _, key := range keys {
if _, err := t.GetStorage(addr, key); err != nil {
return err
}
}
return nil
}

// GetAccount abstract an account read from the trie.
func (t *TransitionTrie) GetAccount(address common.Address) (*types.StateAccount, error) {
data, err := t.overlay.GetAccount(address)
Expand All @@ -94,6 +105,17 @@ func (t *TransitionTrie) GetAccount(address common.Address) (*types.StateAccount
return t.base.GetAccount(address)
}

// PrefetchAccount attempts to resolve specific accounts from the database
// to accelerate subsequent trie operations.
func (t *TransitionTrie) PrefetchAccount(addresses []common.Address) error {
for _, addr := range addresses {
if _, err := t.GetAccount(addr); err != nil {
return err
}
}
return nil
}

// UpdateStorage associates key with value in the trie. If value has length zero, any
// existing value is deleted from the trie. The value bytes must not be modified
// by the caller while they are stored in the trie.
Expand Down Expand Up @@ -173,7 +195,7 @@ func (t *TransitionTrie) IsVerkle() bool {
return true
}

// UpdateStems updates a group of values, given the stem they are using. If
// UpdateStem updates a group of values, given the stem they are using. If
// a value already exists, it is overwritten.
func (t *TransitionTrie) UpdateStem(key []byte, values [][]byte) error {
trie := t.overlay
Expand Down
46 changes: 46 additions & 0 deletions trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/triedb/database"
"golang.org/x/sync/errgroup"
)

// Trie represents a Merkle Patricia Trie. Use New to create a trie that operates
Expand Down Expand Up @@ -194,6 +195,51 @@ func (t *Trie) get(origNode node, key []byte, pos int) (value []byte, newnode no
}
}

// Prefetch attempts to resolve the leaves and intermediate trie nodes
// specified by the key list in parallel. The results are silently
// discarded to simplify the function.
func (t *Trie) Prefetch(keylist [][]byte) error {
// Short circuit if the trie is already committed and not usable.
if t.committed {
return ErrCommitted
}
// Resolve the trie nodes sequentially if there are not too many
// trie nodes in the trie.
fn, ok := t.root.(*fullNode)
if !ok || len(keylist) < 16 {
for _, key := range keylist {
_, err := t.Get(key)
if err != nil {
return err
}
}
return nil
}
var (
keys = make(map[byte][][]byte)
eg errgroup.Group
)
for _, key := range keylist {
hkey := keybytesToHex(key)
keys[hkey[0]] = append(keys[hkey[0]], hkey)
}
for pos, ks := range keys {
eg.Go(func() error {
for _, k := range ks {
_, newnode, didResolve, err := t.get(fn.Children[pos], k, 1)
if err == nil && didResolve {
fn.Children[pos] = newnode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to store the resolved node here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah so we don't resolve them again on the next iteration

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to store the resolved nodes, otherwise the resolved descendants will be lost

}
if err != nil {
return err
}
}
return nil
})
}
return eg.Wait()
}

// MustGetNode is a wrapper of GetNode and will omit any encountered error but
// just print out an error message.
func (t *Trie) MustGetNode(path []byte) ([]byte, int) {
Expand Down
Loading