Skip to content

Commit

Permalink
core/state: improve prefetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Mar 5, 2024
1 parent 13196b8 commit 453f06d
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 70 deletions.
153 changes: 85 additions & 68 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package state

import (
"errors"
"sync"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -27,6 +28,9 @@ import (
var (
// triePrefetchMetricsPrefix is the prefix under which to publish the metrics.
triePrefetchMetricsPrefix = "trie/prefetch/"

// errTerminated is returned if any invocation is applied on a terminated fetcher.
errTerminated = errors.New("fetcher is already terminated")
)

// triePrefetcher is an active prefetcher, which receives accounts or storage
Expand All @@ -43,56 +47,50 @@ type triePrefetcher struct {
deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
accountSkipMeter metrics.Meter
accountWasteMeter metrics.Meter
storageLoadMeter metrics.Meter
storageDupMeter metrics.Meter
storageSkipMeter metrics.Meter
storageWasteMeter metrics.Meter
}

func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
return &triePrefetcher{
db: db,
root: root,
fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map

deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
return p
}

// close iterates over all the subfetchers, waits on any that were left spinning
// and reports the stats to the metrics subsystem. close should not be called
// more than once on a triePrefetcher instance.
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
// Short circuit if the fetcher is already closed.
if p.closed {
return
}
for _, fetcher := range p.fetchers {
fetcher.close()

if metrics.Enabled {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))

for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))

for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
Expand All @@ -101,50 +99,54 @@ func (p *triePrefetcher) close() {
}
}
p.closed = true
p.fetchers = nil
}

// prefetch schedules a batch of trie items to prefetch. After the prefetcher is closed, all the following tasks scheduled will not be executed.
// prefetch schedules a batch of trie items to prefetch. After the prefetcher is
// closed, all the following tasks scheduled will not be executed and an error
// will be returned.
//
// prefetch is called from two locations:
//
// 1. Finalize of the state-objects storage roots. This happens at the end
// of every transaction, meaning that if several transactions touches
// upon the same contract, the parameters invoking this method may be
// repeated.
// 2. Finalize of the main account trie. This happens only once per block.
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) {
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) error {
if p.closed {
return errTerminated
}
id := p.trieID(owner, root)
fetcher := p.fetchers[id]
if fetcher == nil {
fetcher = newSubfetcher(p.db, p.root, owner, root, addr)
p.fetchers[id] = fetcher
}
fetcher.schedule(keys)
return fetcher.schedule(keys)
}

// trie returns the trie matching the root hash, or nil if the prefetcher doesn't
// have it. trie is not safe to call concurrently
// trie returns the trie matching the root hash, or nil if either the fetcher
// is terminated or the trie is not available.
func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {
if p.closed {
return nil
}
// Bail if no trie was prefetched for this root
fetcher := p.fetchers[p.trieID(owner, root)]
if fetcher == nil || fetcher.trie == nil {
if fetcher == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
if p.closed {
return fetcher.db.CopyTrie(fetcher.trie)
}
trieChan := make(chan Trie)
fetcher.copy <- trieChan
select {
case fetcher.wake <- true:
default:
}
return <-trieChan
return fetcher.peek()
}

// used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the prefetcher is.
// how useful or wasteful the fetcher is.
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) {
if p.closed {
return
}
if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil {
fetcher.used = used
}
Expand Down Expand Up @@ -173,7 +175,8 @@ type subfetcher struct {
tasks [][]byte // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue

wake chan bool // Wake channel if a new task is scheduled, true if the subfetcher should continue running when there are no pending tasks
wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal interruption
copy chan chan Trie // channel for retrieving copies of the subfetcher's trie

Expand All @@ -191,34 +194,52 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo
owner: owner,
root: root,
addr: addr,
wake: make(chan bool, 1),
copy: make(chan chan Trie, 1),
wake: make(chan struct{}),
stop: make(chan struct{}),
term: make(chan struct{}),
copy: make(chan chan Trie),
seen: make(map[string]struct{}),
}
go sf.loop()
return sf
}

// schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(keys [][]byte) {
func (sf *subfetcher) schedule(keys [][]byte) error {
// Append the tasks to the current queue
sf.lock.Lock()

sf.tasks = append(sf.tasks, keys...)
sf.lock.Unlock()
// Notify the prefetcher, it's fine if it's already terminated

// Notify the background thread to execute scheduled tasks
select {
case sf.wake <- true:
default:
case sf.wake <- struct{}{}:
return nil
case <-sf.term:
return errTerminated
}
}

// peek tries to retrieve a deep copy of the fetcher's trie. Nil is returned
// if the fetcher is already terminated, or the associated trie is failing
// for opening.
func (sf *subfetcher) peek() Trie {
ch := make(chan Trie)
select {
case sf.copy <- ch:
return <-ch
case <-sf.term:
return nil
}
}

// close waits for the subfetcher to finish its tasks. It cannot be called multiple times
func (sf *subfetcher) close() {
// Notify the prefetcher. The wake-chan is buffered, so this is async.
sf.wake <- false
// Wait for it to terminate
select {
case <-sf.stop:
default:
close(sf.stop)
}
<-sf.term
}

Expand All @@ -228,8 +249,7 @@ func (sf *subfetcher) loop() {
// No matter how the loop stops, signal anyone waiting that it's terminated
defer close(sf.term)

// Any calls to trie
// start by opening the trie and stop processing if it fails.
// Start by opening the trie and stop processing if it fails.
if sf.owner == (common.Hash{}) {
trie, err := sf.db.OpenTrie(sf.root)
if err != nil {
Expand All @@ -238,8 +258,6 @@ func (sf *subfetcher) loop() {
}
sf.trie = trie
} else {
// The trie argument can be nil as verkle doesn't support prefetching
// yet. TODO FIX IT(rjl493456442), otherwise code will panic here.
trie, err := sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil)
if err != nil {
log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
Expand All @@ -248,34 +266,33 @@ func (sf *subfetcher) loop() {
sf.trie = trie
}
// Trie opened successfully, keep prefetching items
for keepRunning := range sf.wake {
// Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
sf.lock.Lock()
tasks := sf.tasks
sf.tasks = nil
sf.lock.Unlock()
for {
select {
case <-sf.wake:
// Execute all remaining tasks in single run
sf.lock.Lock()
tasks := sf.tasks
sf.tasks = nil
sf.lock.Unlock()

// Prefetch all tasks
for _, task := range tasks {
if _, ok := sf.seen[string(task)]; ok {
sf.dups++
continue
}
if len(task) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task))
} else {
sf.trie.GetStorage(sf.addr, task)
for _, task := range tasks {
if _, ok := sf.seen[string(task)]; ok {
sf.dups++
continue
}
if len(task) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task))
} else {
sf.trie.GetStorage(sf.addr, task)
}
sf.seen[string(task)] = struct{}{}
}
sf.seen[string(task)] = struct{}{}
}
// if any trie retrieval request is made, ensure it is completed
// after pending tasks have been processed.
select {
case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them.
ch <- sf.db.CopyTrie(sf.trie)
default:
}
if !keepRunning {

case <-sf.stop:
// Termination is requested, abort
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/state/trie_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestUseAfterClose(t *testing.T) {
if a == nil {
t.Fatal("Prefetching before close should not return nil")
}
if b == nil {
t.Fatal("Trie after close should not return nil")
if b != nil {
t.Fatal("Trie after close should return nil")
}
}

0 comments on commit 453f06d

Please sign in to comment.