Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/state: trie prefetcher change: calling trie() doesn't stop the associated subfetcher #29035

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,13 +765,6 @@ func (s *StateDB) Copy() *StateDB {
// in the middle of a transaction.
state.accessList = s.accessList.Copy()
state.transientStorage = s.transientStorage.Copy()

// If there's a prefetcher running, make an inactive copy of it that can
// only access data but does not actively preload (since the user will not
// know that they need to explicitly terminate an active copy).
if s.prefetcher != nil {
state.prefetcher = s.prefetcher.copy()
}
return state
}

Expand Down
202 changes: 68 additions & 134 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package state

import (
"errors"
"sync"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -27,6 +28,9 @@ import (
var (
// triePrefetchMetricsPrefix is the prefix under which to publish the metrics.
triePrefetchMetricsPrefix = "trie/prefetch/"

// errTerminated is returned if any invocation is applied on a terminated fetcher.
errTerminated = errors.New("fetcher is already terminated")
)

// triePrefetcher is an active prefetcher, which receives accounts or storage
Expand All @@ -37,159 +41,112 @@ var (
type triePrefetcher struct {
db Database // Database to fetch trie nodes through
root common.Hash // Root hash of the account trie for metrics
fetches map[string]Trie // Partially or fully fetched tries. Only populated for inactive copies.
fetchers map[string]*subfetcher // Subfetchers for each trie
closed bool

deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
accountSkipMeter metrics.Meter
accountWasteMeter metrics.Meter
storageLoadMeter metrics.Meter
storageDupMeter metrics.Meter
storageSkipMeter metrics.Meter
storageWasteMeter metrics.Meter
}

func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
return &triePrefetcher{
db: db,
root: root,
fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map

deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
return p
}

// close iterates over all the subfetchers, aborts any that were left spinning
// close iterates over all the subfetchers, waits on any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
// Short circuit if the fetcher is already closed.
if p.closed {
return
}
for _, fetcher := range p.fetchers {
fetcher.abort() // safe to do multiple times
fetcher.close()

if metrics.Enabled {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))

for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))

for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
}
}
}
// Clear out all fetchers (will crash on a second call, deliberate)
p.closed = true
p.fetchers = nil
}

// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
// already loaded will be copied over, but no goroutines will be started. This
// is mostly used in the miner which creates a copy of it's actively mutated
// state to be sealed while it may further mutate the state.
func (p *triePrefetcher) copy() *triePrefetcher {
copy := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[string]Trie), // Active prefetchers use the fetches map

deliveryMissMeter: p.deliveryMissMeter,
accountLoadMeter: p.accountLoadMeter,
accountDupMeter: p.accountDupMeter,
accountSkipMeter: p.accountSkipMeter,
accountWasteMeter: p.accountWasteMeter,
storageLoadMeter: p.storageLoadMeter,
storageDupMeter: p.storageDupMeter,
storageSkipMeter: p.storageSkipMeter,
storageWasteMeter: p.storageWasteMeter,
}
// If the prefetcher is already a copy, duplicate the data
if p.fetches != nil {
for root, fetch := range p.fetches {
if fetch == nil {
continue
}
copy.fetches[root] = p.db.CopyTrie(fetch)
}
return copy
}
// Otherwise we're copying an active fetcher, retrieve the current states
for id, fetcher := range p.fetchers {
copy.fetches[id] = fetcher.peek()
}
return copy
}

// prefetch schedules a batch of trie items to prefetch.
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) {
// If the prefetcher is an inactive one, bail out
if p.fetches != nil {
return
// prefetch schedules a batch of trie items to prefetch. After the prefetcher is
// closed, all the following tasks scheduled will not be executed and an error
// will be returned.
//
// prefetch is called from two locations:
//
// 1. Finalize of the state-objects storage roots. This happens at the end
// of every transaction, meaning that if several transactions touches
// upon the same contract, the parameters invoking this method may be
// repeated.
// 2. Finalize of the main account trie. This happens only once per block.
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) error {
if p.closed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever a multi-thread accessing prefetch / close ? If so, might be better to make closed into an atomic.Bool? But if we don't access it that way, it's fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefetcher is manipulated in the statedb, which is not regarded as thread-safe. I believe we never do multi-thread accessing prefetch.

return errTerminated
}
// Active fetcher, schedule the retrievals
id := p.trieID(owner, root)
fetcher := p.fetchers[id]
if fetcher == nil {
fetcher = newSubfetcher(p.db, p.root, owner, root, addr)
p.fetchers[id] = fetcher
}
fetcher.schedule(keys)
return fetcher.schedule(keys)
}

// trie returns the trie matching the root hash, or nil if the prefetcher doesn't
// have it.
// trie returns the trie matching the root hash, or nil if either the fetcher
// is terminated or the trie is not available.
func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
// If the prefetcher is inactive, return from existing deep copies
id := p.trieID(owner, root)
if p.fetches != nil {
trie := p.fetches[id]
if trie == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
return p.db.CopyTrie(trie)
}
// Otherwise the prefetcher is active, bail if no trie was prefetched for this root
fetcher := p.fetchers[id]
if fetcher == nil {
p.deliveryMissMeter.Mark(1)
if p.closed {
return nil
}
// Interrupt the prefetcher if it's by any chance still running and return
// a copy of any pre-loaded trie.
fetcher.abort() // safe to do multiple times

trie := fetcher.peek()
if trie == nil {
// Bail if no trie was prefetched for this root
fetcher := p.fetchers[p.trieID(owner, root)]
if fetcher == nil {
jwasinger marked this conversation as resolved.
Show resolved Hide resolved
p.deliveryMissMeter.Mark(1)
return nil
}
return trie
return fetcher.peek()
}

// used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the prefetcher is.
// how useful or wasteful the fetcher is.
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) {
if p.closed {
return
}
if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil {
fetcher.used = used
}
Expand Down Expand Up @@ -221,7 +178,7 @@ type subfetcher struct {
wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal interruption
copy chan chan Trie // Channel to request a copy of the current trie
copy chan chan Trie // channel for retrieving copies of the subfetcher's trie

seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks
Expand All @@ -237,7 +194,7 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo
owner: owner,
root: root,
addr: addr,
wake: make(chan struct{}, 1),
wake: make(chan struct{}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems problematic. Whereas the original code allowed to schedule the request and then fly off continuing execution, the new code (along with L217 below) will block until a previous request is done (actually, block on the second request).

Any particular reason you made the async notifier synchronous?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well spotted!

stop: make(chan struct{}),
term: make(chan struct{}),
copy: make(chan chan Trie),
Expand All @@ -248,40 +205,36 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo
}

// schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(keys [][]byte) {
func (sf *subfetcher) schedule(keys [][]byte) error {
// Append the tasks to the current queue
sf.lock.Lock()
sf.tasks = append(sf.tasks, keys...)
sf.lock.Unlock()

// Notify the prefetcher, it's fine if it's already terminated
// Notify the background thread to execute scheduled tasks
select {
case sf.wake <- struct{}{}:
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
default:
return nil
case <-sf.term:
return errTerminated
}
}

// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
// is currently.
// peek tries to retrieve a deep copy of the fetcher's trie. Nil is returned
// if the fetcher is already terminated, or the associated trie is failing
// for opening.
func (sf *subfetcher) peek() Trie {
ch := make(chan Trie)
select {
case sf.copy <- ch:
// Subfetcher still alive, return copy from it
return <-ch

case <-sf.term:
// Subfetcher already terminated, return a copy directly
if sf.trie == nil {
return nil
}
return sf.db.CopyTrie(sf.trie)
return nil
}
}

// abort interrupts the subfetcher immediately. It is safe to call abort multiple
// times but it is not thread safe.
func (sf *subfetcher) abort() {
// close waits for the subfetcher to finish its tasks. It cannot be called multiple times
func (sf *subfetcher) close() {
select {
case <-sf.stop:
default:
Expand All @@ -290,13 +243,13 @@ func (sf *subfetcher) abort() {
<-sf.term
}

// loop waits for new tasks to be scheduled and keeps loading them until it runs
// out of tasks or its underlying trie is retrieved for committing.
// loop loads newly-scheduled trie tasks as they are received and loads them, stopping
// when requested.
func (sf *subfetcher) loop() {
// No matter how the loop stops, signal anyone waiting that it's terminated
defer close(sf.term)

// Start by opening the trie and stop processing if it fails
// Start by opening the trie and stop processing if it fails.
if sf.owner == (common.Hash{}) {
jwasinger marked this conversation as resolved.
Show resolved Hide resolved
trie, err := sf.db.OpenTrie(sf.root)
if err != nil {
Expand All @@ -305,8 +258,6 @@ func (sf *subfetcher) loop() {
}
sf.trie = trie
} else {
// The trie argument can be nil as verkle doesn't support prefetching
// yet. TODO FIX IT(rjl493456442), otherwise code will panic here.
trie, err := sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil)
if err != nil {
log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
Expand All @@ -318,47 +269,30 @@ func (sf *subfetcher) loop() {
for {
select {
case <-sf.wake:
// Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
// Execute all remaining tasks in single run
sf.lock.Lock()
tasks := sf.tasks
sf.tasks = nil
sf.lock.Unlock()

// Prefetch any tasks until the loop is interrupted
for i, task := range tasks {
select {
case <-sf.stop:
// If termination is requested, add any leftover back and return
sf.lock.Lock()
sf.tasks = append(sf.tasks, tasks[i:]...)
sf.lock.Unlock()
return

case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them
ch <- sf.db.CopyTrie(sf.trie)

default:
// No termination request yet, prefetch the next entry
if _, ok := sf.seen[string(task)]; ok {
sf.dups++
} else {
if len(task) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task))
} else {
sf.trie.GetStorage(sf.addr, task)
}
sf.seen[string(task)] = struct{}{}
}
for _, task := range tasks {
if _, ok := sf.seen[string(task)]; ok {
sf.dups++
continue
}
if len(task) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task))
} else {
sf.trie.GetStorage(sf.addr, task)
}
sf.seen[string(task)] = struct{}{}
}

case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them
// Somebody wants a copy of the current trie, grant them.
ch <- sf.db.CopyTrie(sf.trie)

case <-sf.stop:
// Termination is requested, abort and leave remaining tasks
// Termination is requested, abort
return
}
}
Expand Down
Loading
Loading