diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index a81e528e6cfe..f9d52b9844b4 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -17,6 +17,7 @@ package state import ( + "errors" "sync" "github.com/ethereum/go-ethereum/common" @@ -27,6 +28,9 @@ import ( var ( // triePrefetchMetricsPrefix is the prefix under which to publish the metrics. triePrefetchMetricsPrefix = "trie/prefetch/" + + // errTerminated is returned if any invocation is applied on a terminated fetcher. + errTerminated = errors.New("fetcher is already terminated") ) // triePrefetcher is an active prefetcher, which receives accounts or storage @@ -43,17 +47,15 @@ type triePrefetcher struct { deliveryMissMeter metrics.Meter accountLoadMeter metrics.Meter accountDupMeter metrics.Meter - accountSkipMeter metrics.Meter accountWasteMeter metrics.Meter storageLoadMeter metrics.Meter storageDupMeter metrics.Meter - storageSkipMeter metrics.Meter storageWasteMeter metrics.Meter } func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace - p := &triePrefetcher{ + return &triePrefetcher{ db: db, root: root, fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map @@ -61,20 +63,20 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil), - accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil), accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil), storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), - storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), } - return p } // close iterates over all the subfetchers, waits on any that were left spinning -// and reports the stats to the metrics subsystem. close should not be called -// more than once on a triePrefetcher instance. +// and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { + // Short circuit if the fetcher is already closed. + if p.closed { + return + } for _, fetcher := range p.fetchers { fetcher.close() @@ -82,8 +84,6 @@ func (p *triePrefetcher) close() { if fetcher.root == p.root { p.accountLoadMeter.Mark(int64(len(fetcher.seen))) p.accountDupMeter.Mark(int64(fetcher.dups)) - p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) - for _, key := range fetcher.used { delete(fetcher.seen, string(key)) } @@ -91,8 +91,6 @@ func (p *triePrefetcher) close() { } else { p.storageLoadMeter.Mark(int64(len(fetcher.seen))) p.storageDupMeter.Mark(int64(fetcher.dups)) - p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) - for _, key := range fetcher.used { delete(fetcher.seen, string(key)) } @@ -101,50 +99,54 @@ func (p *triePrefetcher) close() { } } p.closed = true + p.fetchers = nil } -// prefetch schedules a batch of trie items to prefetch. After the prefetcher is closed, all the following tasks scheduled will not be executed. +// prefetch schedules a batch of trie items to prefetch. After the prefetcher is +// closed, all the following tasks scheduled will not be executed and an error +// will be returned. // // prefetch is called from two locations: +// // 1. Finalize of the state-objects storage roots. This happens at the end // of every transaction, meaning that if several transactions touches // upon the same contract, the parameters invoking this method may be // repeated. // 2. Finalize of the main account trie. This happens only once per block. -func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) { +func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) error { + if p.closed { + return errTerminated + } id := p.trieID(owner, root) fetcher := p.fetchers[id] if fetcher == nil { fetcher = newSubfetcher(p.db, p.root, owner, root, addr) p.fetchers[id] = fetcher } - fetcher.schedule(keys) + return fetcher.schedule(keys) } -// trie returns the trie matching the root hash, or nil if the prefetcher doesn't -// have it. trie is not safe to call concurrently +// trie returns the trie matching the root hash, or nil if either the fetcher +// is terminated or the trie is not available. func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie { + if p.closed { + return nil + } // Bail if no trie was prefetched for this root fetcher := p.fetchers[p.trieID(owner, root)] - if fetcher == nil || fetcher.trie == nil { + if fetcher == nil { p.deliveryMissMeter.Mark(1) return nil } - if p.closed { - return fetcher.db.CopyTrie(fetcher.trie) - } - trieChan := make(chan Trie) - fetcher.copy <- trieChan - select { - case fetcher.wake <- true: - default: - } - return <-trieChan + return fetcher.peek() } // used marks a batch of state items used to allow creating statistics as to -// how useful or wasteful the prefetcher is. +// how useful or wasteful the fetcher is. func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) { + if p.closed { + return + } if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil { fetcher.used = used } @@ -173,7 +175,8 @@ type subfetcher struct { tasks [][]byte // Items queued up for retrieval lock sync.Mutex // Lock protecting the task queue - wake chan bool // Wake channel if a new task is scheduled, true if the subfetcher should continue running when there are no pending tasks + wake chan struct{} // Wake channel if a new task is scheduled + stop chan struct{} // Channel to interrupt processing term chan struct{} // Channel to signal interruption copy chan chan Trie // channel for retrieving copies of the subfetcher's trie @@ -191,9 +194,10 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo owner: owner, root: root, addr: addr, - wake: make(chan bool, 1), - copy: make(chan chan Trie, 1), + wake: make(chan struct{}), + stop: make(chan struct{}), term: make(chan struct{}), + copy: make(chan chan Trie), seen: make(map[string]struct{}), } go sf.loop() @@ -201,24 +205,41 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo } // schedule adds a batch of trie keys to the queue to prefetch. -func (sf *subfetcher) schedule(keys [][]byte) { +func (sf *subfetcher) schedule(keys [][]byte) error { // Append the tasks to the current queue sf.lock.Lock() - sf.tasks = append(sf.tasks, keys...) sf.lock.Unlock() - // Notify the prefetcher, it's fine if it's already terminated + + // Notify the background thread to execute scheduled tasks select { - case sf.wake <- true: - default: + case sf.wake <- struct{}{}: + return nil + case <-sf.term: + return errTerminated + } +} + +// peek tries to retrieve a deep copy of the fetcher's trie. Nil is returned +// if the fetcher is already terminated, or the associated trie is failing +// for opening. +func (sf *subfetcher) peek() Trie { + ch := make(chan Trie) + select { + case sf.copy <- ch: + return <-ch + case <-sf.term: + return nil } } // close waits for the subfetcher to finish its tasks. It cannot be called multiple times func (sf *subfetcher) close() { - // Notify the prefetcher. The wake-chan is buffered, so this is async. - sf.wake <- false - // Wait for it to terminate + select { + case <-sf.stop: + default: + close(sf.stop) + } <-sf.term } @@ -228,8 +249,7 @@ func (sf *subfetcher) loop() { // No matter how the loop stops, signal anyone waiting that it's terminated defer close(sf.term) - // Any calls to trie - // start by opening the trie and stop processing if it fails. + // Start by opening the trie and stop processing if it fails. if sf.owner == (common.Hash{}) { trie, err := sf.db.OpenTrie(sf.root) if err != nil { @@ -238,8 +258,6 @@ func (sf *subfetcher) loop() { } sf.trie = trie } else { - // The trie argument can be nil as verkle doesn't support prefetching - // yet. TODO FIX IT(rjl493456442), otherwise code will panic here. trie, err := sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil) if err != nil { log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) @@ -248,34 +266,33 @@ func (sf *subfetcher) loop() { sf.trie = trie } // Trie opened successfully, keep prefetching items - for keepRunning := range sf.wake { - // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock - sf.lock.Lock() - tasks := sf.tasks - sf.tasks = nil - sf.lock.Unlock() + for { + select { + case <-sf.wake: + // Execute all remaining tasks in single run + sf.lock.Lock() + tasks := sf.tasks + sf.tasks = nil + sf.lock.Unlock() - // Prefetch all tasks - for _, task := range tasks { - if _, ok := sf.seen[string(task)]; ok { - sf.dups++ - continue - } - if len(task) == common.AddressLength { - sf.trie.GetAccount(common.BytesToAddress(task)) - } else { - sf.trie.GetStorage(sf.addr, task) + for _, task := range tasks { + if _, ok := sf.seen[string(task)]; ok { + sf.dups++ + continue + } + if len(task) == common.AddressLength { + sf.trie.GetAccount(common.BytesToAddress(task)) + } else { + sf.trie.GetStorage(sf.addr, task) + } + sf.seen[string(task)] = struct{}{} } - sf.seen[string(task)] = struct{}{} - } - // if any trie retrieval request is made, ensure it is completed - // after pending tasks have been processed. - select { case ch := <-sf.copy: + // Somebody wants a copy of the current trie, grant them. ch <- sf.db.CopyTrie(sf.trie) - default: - } - if !keepRunning { + + case <-sf.stop: + // Termination is requested, abort return } } diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index e990d0504f99..73f9de91ca8f 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -55,7 +55,7 @@ func TestUseAfterClose(t *testing.T) { if a == nil { t.Fatal("Prefetching before close should not return nil") } - if b == nil { - t.Fatal("Trie after close should not return nil") + if b != nil { + t.Fatal("Trie after close should return nil") } }