-
Notifications
You must be signed in to change notification settings - Fork 20.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core/state: trie prefetcher change: calling trie() doesn't stop the associated subfetcher #29035
Changes from all commits
d6d06b6
e3e4ffe
46a10c1
df372b0
c7cd97f
dc16ec3
8d1074f
287cead
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
package state | ||
|
||
import ( | ||
"errors" | ||
"sync" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
|
@@ -27,6 +28,9 @@ import ( | |
var ( | ||
// triePrefetchMetricsPrefix is the prefix under which to publish the metrics. | ||
triePrefetchMetricsPrefix = "trie/prefetch/" | ||
|
||
// errTerminated is returned if any invocation is applied on a terminated fetcher. | ||
errTerminated = errors.New("fetcher is already terminated") | ||
) | ||
|
||
// triePrefetcher is an active prefetcher, which receives accounts or storage | ||
|
@@ -37,159 +41,112 @@ var ( | |
type triePrefetcher struct { | ||
db Database // Database to fetch trie nodes through | ||
root common.Hash // Root hash of the account trie for metrics | ||
fetches map[string]Trie // Partially or fully fetched tries. Only populated for inactive copies. | ||
fetchers map[string]*subfetcher // Subfetchers for each trie | ||
closed bool | ||
|
||
deliveryMissMeter metrics.Meter | ||
accountLoadMeter metrics.Meter | ||
accountDupMeter metrics.Meter | ||
accountSkipMeter metrics.Meter | ||
accountWasteMeter metrics.Meter | ||
storageLoadMeter metrics.Meter | ||
storageDupMeter metrics.Meter | ||
storageSkipMeter metrics.Meter | ||
storageWasteMeter metrics.Meter | ||
} | ||
|
||
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { | ||
prefix := triePrefetchMetricsPrefix + namespace | ||
p := &triePrefetcher{ | ||
return &triePrefetcher{ | ||
db: db, | ||
root: root, | ||
fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map | ||
|
||
deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), | ||
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), | ||
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil), | ||
accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil), | ||
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil), | ||
storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), | ||
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), | ||
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), | ||
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), | ||
} | ||
return p | ||
} | ||
|
||
// close iterates over all the subfetchers, aborts any that were left spinning | ||
// close iterates over all the subfetchers, waits on any that were left spinning | ||
// and reports the stats to the metrics subsystem. | ||
func (p *triePrefetcher) close() { | ||
// Short circuit if the fetcher is already closed. | ||
if p.closed { | ||
return | ||
} | ||
for _, fetcher := range p.fetchers { | ||
fetcher.abort() // safe to do multiple times | ||
fetcher.close() | ||
|
||
if metrics.Enabled { | ||
if fetcher.root == p.root { | ||
p.accountLoadMeter.Mark(int64(len(fetcher.seen))) | ||
p.accountDupMeter.Mark(int64(fetcher.dups)) | ||
p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) | ||
|
||
for _, key := range fetcher.used { | ||
delete(fetcher.seen, string(key)) | ||
} | ||
p.accountWasteMeter.Mark(int64(len(fetcher.seen))) | ||
} else { | ||
p.storageLoadMeter.Mark(int64(len(fetcher.seen))) | ||
p.storageDupMeter.Mark(int64(fetcher.dups)) | ||
p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) | ||
|
||
for _, key := range fetcher.used { | ||
delete(fetcher.seen, string(key)) | ||
} | ||
p.storageWasteMeter.Mark(int64(len(fetcher.seen))) | ||
} | ||
} | ||
} | ||
// Clear out all fetchers (will crash on a second call, deliberate) | ||
p.closed = true | ||
p.fetchers = nil | ||
} | ||
|
||
// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data | ||
// already loaded will be copied over, but no goroutines will be started. This | ||
// is mostly used in the miner which creates a copy of it's actively mutated | ||
// state to be sealed while it may further mutate the state. | ||
func (p *triePrefetcher) copy() *triePrefetcher { | ||
copy := &triePrefetcher{ | ||
db: p.db, | ||
root: p.root, | ||
fetches: make(map[string]Trie), // Active prefetchers use the fetches map | ||
|
||
deliveryMissMeter: p.deliveryMissMeter, | ||
accountLoadMeter: p.accountLoadMeter, | ||
accountDupMeter: p.accountDupMeter, | ||
accountSkipMeter: p.accountSkipMeter, | ||
accountWasteMeter: p.accountWasteMeter, | ||
storageLoadMeter: p.storageLoadMeter, | ||
storageDupMeter: p.storageDupMeter, | ||
storageSkipMeter: p.storageSkipMeter, | ||
storageWasteMeter: p.storageWasteMeter, | ||
} | ||
// If the prefetcher is already a copy, duplicate the data | ||
if p.fetches != nil { | ||
for root, fetch := range p.fetches { | ||
if fetch == nil { | ||
continue | ||
} | ||
copy.fetches[root] = p.db.CopyTrie(fetch) | ||
} | ||
return copy | ||
} | ||
// Otherwise we're copying an active fetcher, retrieve the current states | ||
for id, fetcher := range p.fetchers { | ||
copy.fetches[id] = fetcher.peek() | ||
} | ||
return copy | ||
} | ||
|
||
// prefetch schedules a batch of trie items to prefetch. | ||
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) { | ||
// If the prefetcher is an inactive one, bail out | ||
if p.fetches != nil { | ||
return | ||
// prefetch schedules a batch of trie items to prefetch. After the prefetcher is | ||
// closed, all the following tasks scheduled will not be executed and an error | ||
// will be returned. | ||
// | ||
// prefetch is called from two locations: | ||
// | ||
// 1. Finalize of the state-objects storage roots. This happens at the end | ||
// of every transaction, meaning that if several transactions touches | ||
// upon the same contract, the parameters invoking this method may be | ||
// repeated. | ||
// 2. Finalize of the main account trie. This happens only once per block. | ||
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) error { | ||
if p.closed { | ||
return errTerminated | ||
} | ||
// Active fetcher, schedule the retrievals | ||
id := p.trieID(owner, root) | ||
fetcher := p.fetchers[id] | ||
if fetcher == nil { | ||
fetcher = newSubfetcher(p.db, p.root, owner, root, addr) | ||
p.fetchers[id] = fetcher | ||
} | ||
fetcher.schedule(keys) | ||
return fetcher.schedule(keys) | ||
} | ||
|
||
// trie returns the trie matching the root hash, or nil if the prefetcher doesn't | ||
// have it. | ||
// trie returns the trie matching the root hash, or nil if either the fetcher | ||
// is terminated or the trie is not available. | ||
func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie { | ||
rjl493456442 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// If the prefetcher is inactive, return from existing deep copies | ||
id := p.trieID(owner, root) | ||
if p.fetches != nil { | ||
trie := p.fetches[id] | ||
if trie == nil { | ||
p.deliveryMissMeter.Mark(1) | ||
return nil | ||
} | ||
return p.db.CopyTrie(trie) | ||
} | ||
// Otherwise the prefetcher is active, bail if no trie was prefetched for this root | ||
fetcher := p.fetchers[id] | ||
if fetcher == nil { | ||
p.deliveryMissMeter.Mark(1) | ||
if p.closed { | ||
return nil | ||
} | ||
// Interrupt the prefetcher if it's by any chance still running and return | ||
// a copy of any pre-loaded trie. | ||
fetcher.abort() // safe to do multiple times | ||
|
||
trie := fetcher.peek() | ||
if trie == nil { | ||
// Bail if no trie was prefetched for this root | ||
fetcher := p.fetchers[p.trieID(owner, root)] | ||
if fetcher == nil { | ||
jwasinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
p.deliveryMissMeter.Mark(1) | ||
return nil | ||
} | ||
return trie | ||
return fetcher.peek() | ||
} | ||
|
||
// used marks a batch of state items used to allow creating statistics as to | ||
// how useful or wasteful the prefetcher is. | ||
// how useful or wasteful the fetcher is. | ||
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) { | ||
if p.closed { | ||
return | ||
} | ||
if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil { | ||
fetcher.used = used | ||
} | ||
|
@@ -221,7 +178,7 @@ type subfetcher struct { | |
wake chan struct{} // Wake channel if a new task is scheduled | ||
stop chan struct{} // Channel to interrupt processing | ||
term chan struct{} // Channel to signal interruption | ||
copy chan chan Trie // Channel to request a copy of the current trie | ||
copy chan chan Trie // channel for retrieving copies of the subfetcher's trie | ||
|
||
seen map[string]struct{} // Tracks the entries already loaded | ||
dups int // Number of duplicate preload tasks | ||
|
@@ -237,7 +194,7 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo | |
owner: owner, | ||
root: root, | ||
addr: addr, | ||
wake: make(chan struct{}, 1), | ||
wake: make(chan struct{}), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change seems problematic. Whereas the original code allowed to schedule the request and then fly off continuing execution, the new code (along with L217 below) will block until a previous request is done (actually, block on the second request). Any particular reason you made the async notifier synchronous? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well spotted! |
||
stop: make(chan struct{}), | ||
term: make(chan struct{}), | ||
copy: make(chan chan Trie), | ||
|
@@ -248,40 +205,36 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo | |
} | ||
|
||
// schedule adds a batch of trie keys to the queue to prefetch. | ||
func (sf *subfetcher) schedule(keys [][]byte) { | ||
func (sf *subfetcher) schedule(keys [][]byte) error { | ||
// Append the tasks to the current queue | ||
sf.lock.Lock() | ||
sf.tasks = append(sf.tasks, keys...) | ||
sf.lock.Unlock() | ||
|
||
// Notify the prefetcher, it's fine if it's already terminated | ||
// Notify the background thread to execute scheduled tasks | ||
select { | ||
case sf.wake <- struct{}{}: | ||
rjl493456442 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
default: | ||
return nil | ||
case <-sf.term: | ||
return errTerminated | ||
} | ||
} | ||
|
||
// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it | ||
// is currently. | ||
// peek tries to retrieve a deep copy of the fetcher's trie. Nil is returned | ||
// if the fetcher is already terminated, or the associated trie is failing | ||
// for opening. | ||
func (sf *subfetcher) peek() Trie { | ||
ch := make(chan Trie) | ||
select { | ||
case sf.copy <- ch: | ||
// Subfetcher still alive, return copy from it | ||
return <-ch | ||
|
||
case <-sf.term: | ||
// Subfetcher already terminated, return a copy directly | ||
if sf.trie == nil { | ||
return nil | ||
} | ||
return sf.db.CopyTrie(sf.trie) | ||
return nil | ||
} | ||
} | ||
|
||
// abort interrupts the subfetcher immediately. It is safe to call abort multiple | ||
// times but it is not thread safe. | ||
func (sf *subfetcher) abort() { | ||
// close waits for the subfetcher to finish its tasks. It cannot be called multiple times | ||
func (sf *subfetcher) close() { | ||
select { | ||
case <-sf.stop: | ||
default: | ||
|
@@ -290,13 +243,13 @@ func (sf *subfetcher) abort() { | |
<-sf.term | ||
} | ||
|
||
// loop waits for new tasks to be scheduled and keeps loading them until it runs | ||
// out of tasks or its underlying trie is retrieved for committing. | ||
// loop loads newly-scheduled trie tasks as they are received and loads them, stopping | ||
// when requested. | ||
func (sf *subfetcher) loop() { | ||
// No matter how the loop stops, signal anyone waiting that it's terminated | ||
defer close(sf.term) | ||
|
||
// Start by opening the trie and stop processing if it fails | ||
// Start by opening the trie and stop processing if it fails. | ||
if sf.owner == (common.Hash{}) { | ||
jwasinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
trie, err := sf.db.OpenTrie(sf.root) | ||
if err != nil { | ||
|
@@ -305,8 +258,6 @@ func (sf *subfetcher) loop() { | |
} | ||
sf.trie = trie | ||
} else { | ||
// The trie argument can be nil as verkle doesn't support prefetching | ||
// yet. TODO FIX IT(rjl493456442), otherwise code will panic here. | ||
trie, err := sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil) | ||
if err != nil { | ||
log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) | ||
|
@@ -318,47 +269,30 @@ func (sf *subfetcher) loop() { | |
for { | ||
select { | ||
case <-sf.wake: | ||
// Subfetcher was woken up, retrieve any tasks to avoid spinning the lock | ||
// Execute all remaining tasks in single run | ||
sf.lock.Lock() | ||
tasks := sf.tasks | ||
sf.tasks = nil | ||
sf.lock.Unlock() | ||
|
||
// Prefetch any tasks until the loop is interrupted | ||
for i, task := range tasks { | ||
select { | ||
case <-sf.stop: | ||
// If termination is requested, add any leftover back and return | ||
sf.lock.Lock() | ||
sf.tasks = append(sf.tasks, tasks[i:]...) | ||
sf.lock.Unlock() | ||
return | ||
|
||
case ch := <-sf.copy: | ||
// Somebody wants a copy of the current trie, grant them | ||
ch <- sf.db.CopyTrie(sf.trie) | ||
|
||
default: | ||
// No termination request yet, prefetch the next entry | ||
if _, ok := sf.seen[string(task)]; ok { | ||
sf.dups++ | ||
} else { | ||
if len(task) == common.AddressLength { | ||
sf.trie.GetAccount(common.BytesToAddress(task)) | ||
} else { | ||
sf.trie.GetStorage(sf.addr, task) | ||
} | ||
sf.seen[string(task)] = struct{}{} | ||
} | ||
for _, task := range tasks { | ||
if _, ok := sf.seen[string(task)]; ok { | ||
sf.dups++ | ||
continue | ||
} | ||
if len(task) == common.AddressLength { | ||
sf.trie.GetAccount(common.BytesToAddress(task)) | ||
} else { | ||
sf.trie.GetStorage(sf.addr, task) | ||
} | ||
sf.seen[string(task)] = struct{}{} | ||
} | ||
|
||
case ch := <-sf.copy: | ||
// Somebody wants a copy of the current trie, grant them | ||
// Somebody wants a copy of the current trie, grant them. | ||
ch <- sf.db.CopyTrie(sf.trie) | ||
|
||
case <-sf.stop: | ||
// Termination is requested, abort and leave remaining tasks | ||
// Termination is requested, abort | ||
return | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there ever a multi-thread accessing prefetch / close ? If so, might be better to make
closed
into anatomic.Bool
? But if we don't access it that way, it's fine.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefetcher is manipulated in the statedb, which is not regarded as thread-safe. I believe we never do multi-thread accessing prefetch.