Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,12 @@ type BlockChain struct {
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing

engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher // Block state prefetcher interface
processor Processor // Block transaction processor interface
vmConfig vm.Config
engine consensus.Engine
validator Validator // Block and state validator interface
triePrefetcher *state.TriePrefetcher // Trie prefetcher interface
prefetcher Prefetcher
processor Processor // Block transaction processor interface
vmConfig vm.Config

badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
Expand Down Expand Up @@ -253,6 +254,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
tp := state.NewTriePrefetcher(bc.stateCache)
tp.Start()
bc.triePrefetcher = tp

bc.processor = NewStateProcessor(chainConfig, bc, engine)

var err error
Expand Down Expand Up @@ -995,6 +1000,9 @@ func (bc *BlockChain) Stop() {
bc.scope.Close()
close(bc.quit)
bc.StopInsert()
if bc.triePrefetcher != nil {
bc.triePrefetcher.Close()
}
bc.wg.Wait()

// Ensure that the entirety of the state snapshot is journalled to disk.
Expand Down Expand Up @@ -1861,6 +1869,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if bc.snaps != nil {
statedb.UsePrefetcher(bc.triePrefetcher)
}
if err != nil {
return it.index, err
}
Expand Down Expand Up @@ -1895,8 +1906,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them

triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates

Expand Down
12 changes: 10 additions & 2 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,20 @@ type cachingDB struct {

// OpenTrie opens the main account trie at a specific root hash.
func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) {
return trie.NewSecure(root, db.db)
tr, err := trie.NewSecure(root, db.db)
if err != nil {
return nil, err
}
return tr, nil
}

// OpenStorageTrie opens the storage trie of an account.
func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) {
return trie.NewSecure(root, db.db)
tr, err := trie.NewSecure(root, db.db)
if err != nil {
return nil, err
}
return tr, nil
}

// CopyTrie returns an independent copy of the given trie.
Expand Down
84 changes: 62 additions & 22 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,20 @@ func (s *stateObject) touch() {

func (s *stateObject) getTrie(db Database) Trie {
if s.trie == nil {
var err error
s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root)
if err != nil {
s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{})
s.setError(fmt.Errorf("can't create storage trie: %v", err))
// Try fetching from prefetcher first
// We don't prefetch empty tries
if s.data.Root != emptyRoot && s.db.prefetcher != nil {
// When the miner is creating the pending state, there is no
// prefetcher
s.trie = s.db.prefetcher.GetStorageTrie(s.data.Root)
}
if s.trie == nil {
var err error
s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root)
if err != nil {
s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{})
s.setError(fmt.Errorf("can't create storage trie: %v", err))
}
}
}
return s.trie
Expand Down Expand Up @@ -197,12 +206,24 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
// If no live objects are available, attempt to use snapshots
var (
enc []byte
err error
enc []byte
err error
meter *time.Duration
)
readStart := time.Now()
if metrics.EnabledExpensive {
// If the snap is 'under construction', the first lookup may fail. If that
// happens, we don't want to double-count the time elapsed. Thus this
// dance with the metering.
defer func() {
if meter != nil {
*meter += time.Since(readStart)
}
}()
}
if s.db.snap != nil {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.SnapshotStorageReads += time.Since(start) }(time.Now())
meter = &s.db.SnapshotStorageReads
}
// If the object was destructed in *this* block (and potentially resurrected),
// the storage has been cleared out, and we should *not* consult the previous
Expand All @@ -217,8 +238,14 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
// If snapshot unavailable or reading from it failed, load from the database
if s.db.snap == nil || err != nil {
if meter != nil {
// If we already spent time checking the snapshot, account for it
// and reset the readStart
*meter += time.Since(readStart)
readStart = time.Now()
}
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageReads += time.Since(start) }(time.Now())
meter = &s.db.StorageReads
}
if enc, err = s.getTrie(db).TryGet(key.Bytes()); err != nil {
s.setError(err)
Expand Down Expand Up @@ -282,9 +309,16 @@ func (s *stateObject) setState(key, value common.Hash) {

// finalise moves all dirty storage slots into the pending area to be hashed or
// committed later. It is invoked at the end of every transaction.
func (s *stateObject) finalise() {
func (s *stateObject) finalise(prefetch bool) {
slotsToPrefetch := make([]common.Hash, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage {
s.pendingStorage[key] = value
if value != s.originStorage[key] {
slotsToPrefetch = append(slotsToPrefetch, key)
}
}
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
s.db.prefetcher.PrefetchStorage(s.data.Root, slotsToPrefetch)
}
if len(s.dirtyStorage) > 0 {
s.dirtyStorage = make(Storage)
Expand All @@ -295,26 +329,21 @@ func (s *stateObject) finalise() {
// It will return nil if the trie has not been loaded and no changes have been made
func (s *stateObject) updateTrie(db Database) Trie {
// Make sure all dirty slots are finalized into the pending storage area
s.finalise()
s.finalise(false) // Don't prefetch any more, pull directly if need be
if len(s.pendingStorage) == 0 {
return s.trie
}
// Track the amount of time wasted on updating the storage trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now())
}
// Retrieve the snapshot storage map for the object
// The snapshot storage map for the object
var storage map[common.Hash][]byte
if s.db.snap != nil {
// Retrieve the old storage map, if available, create a new one otherwise
storage = s.db.snapStorage[s.addrHash]
if storage == nil {
storage = make(map[common.Hash][]byte)
s.db.snapStorage[s.addrHash] = storage
}
}
// Insert all the pending updates into the trie
tr := s.getTrie(db)
hasher := s.db.hasher

usedStorage := make([]common.Hash, 0, len(s.pendingStorage))
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
Expand All @@ -331,9 +360,20 @@ func (s *stateObject) updateTrie(db Database) Trie {
s.setError(tr.TryUpdate(key[:], v))
}
// If state snapshotting is active, cache the data til commit
if storage != nil {
storage[crypto.Keccak256Hash(key[:])] = v // v will be nil if value is 0x00
if s.db.snap != nil {
if storage == nil {
// Retrieve the old storage map, if available, create a new one otherwise
if storage = s.db.snapStorage[s.addrHash]; storage == nil {
storage = make(map[common.Hash][]byte)
s.db.snapStorage[s.addrHash] = storage
}
}
storage[crypto.HashData(hasher, key[:])] = v // v will be nil if value is 0x00
}
usedStorage = append(usedStorage, key)
}
if s.db.prefetcher != nil {
s.db.prefetcher.UseStorage(s.data.Root, usedStorage)
}
if len(s.pendingStorage) > 0 {
s.pendingStorage = make(Storage)
Expand Down
62 changes: 57 additions & 5 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ func (n *proofList) Delete(key []byte) error {
// * Contracts
// * Accounts
type StateDB struct {
db Database
trie Trie
db Database
prefetcher *TriePrefetcher
originalRoot common.Hash // The pre-state root, before any changes were made
trie Trie
hasher crypto.KeccakState

snaps *snapshot.Tree
snap snapshot.Snapshot
Expand Down Expand Up @@ -125,6 +128,7 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
sdb := &StateDB{
db: db,
trie: tr,
originalRoot: root,
snaps: snaps,
stateObjects: make(map[common.Address]*stateObject),
stateObjectsPending: make(map[common.Address]struct{}),
Expand All @@ -133,6 +137,7 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
preimages: make(map[common.Hash][]byte),
journal: newJournal(),
accessList: newAccessList(),
hasher: crypto.NewKeccakState(),
}
if sdb.snaps != nil {
if sdb.snap = sdb.snaps.Snapshot(root); sdb.snap != nil {
Expand All @@ -144,6 +149,13 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
return sdb, nil
}

func (s *StateDB) UsePrefetcher(prefetcher *TriePrefetcher) {
if prefetcher != nil {
s.prefetcher = prefetcher
s.prefetcher.Resume(s.originalRoot)
}
}

// setError remembers the first non-nil error it is called with.
func (s *StateDB) setError(err error) {
if s.dbErr == nil {
Expand Down Expand Up @@ -532,7 +544,7 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject {
defer func(start time.Time) { s.SnapshotAccountReads += time.Since(start) }(time.Now())
}
var acc *snapshot.Account
if acc, err = s.snap.Account(crypto.Keccak256Hash(addr.Bytes())); err == nil {
if acc, err = s.snap.Account(crypto.HashData(s.hasher, addr.Bytes())); err == nil {
if acc == nil {
return nil
}
Expand Down Expand Up @@ -675,6 +687,7 @@ func (s *StateDB) Copy() *StateDB {
logSize: s.logSize,
preimages: make(map[common.Hash][]byte, len(s.preimages)),
journal: newJournal(),
hasher: crypto.NewKeccakState(),
}
// Copy the dirty states, logs, and preimages
for addr := range s.journal.dirties {
Expand Down Expand Up @@ -760,6 +773,7 @@ func (s *StateDB) GetRefund() uint64 {
// the journal as well as the refunds. Finalise, however, will not push any updates
// into the tries just yet. Only IntermediateRoot or Commit will do that.
func (s *StateDB) Finalise(deleteEmptyObjects bool) {
addressesToPrefetch := make([]common.Address, 0, len(s.journal.dirties))
for addr := range s.journal.dirties {
obj, exist := s.stateObjects[addr]
if !exist {
Expand All @@ -784,10 +798,18 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
delete(s.snapStorage, obj.addrHash) // Clear out any previously updated storage data (may be recreated via a ressurrect)
}
} else {
obj.finalise()
obj.finalise(true) // Prefetch slots in the background
}
s.stateObjectsPending[addr] = struct{}{}
s.stateObjectsDirty[addr] = struct{}{}

// At this point, also ship the address off to the precacher. The precacher
// will start loading tries, and when the change is eventually committed,
// the commit-phase will be a lot faster
addressesToPrefetch = append(addressesToPrefetch, addr)
}
if s.prefetcher != nil && len(addressesToPrefetch) > 0 {
s.prefetcher.PrefetchAddresses(addressesToPrefetch)
}
// Invalidate journal because reverting across transactions is not allowed.
s.clearJournalAndRefund()
Expand All @@ -800,14 +822,44 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// Finalise all the dirty storage states and write them into the tries
s.Finalise(deleteEmptyObjects)

usedAddresses := make([]common.Address, 0, len(s.stateObjectsPending))
// In the first phase, we need the storage tries for hashing.
if s.prefetcher != nil && s.originalRoot != (common.Hash{}) {
// After this, the objects can fetch the storage tries from the
// prefetcher.
s.prefetcher.PauseStorage()
}
// Update the root of all stateobjects in pending
for addr := range s.stateObjectsPending {
obj := s.stateObjects[addr]
if !obj.deleted {
obj.updateRoot(s.db)
}
}
// At this point, we need to update the main trie, and need to pause the
// account-prefetcher aswell.
if s.prefetcher != nil && s.originalRoot != (common.Hash{}) {
s.prefetcher.PauseAccounts()
if trie := s.prefetcher.GetAccountTrie(s.originalRoot); trie != nil {
s.trie = trie
}
// We only want to do this _once_, if someone calls IntermediateRoot again,
// we shouldn't fetch the trie again
s.originalRoot = common.Hash{}
}
// Now update the account trie
for addr := range s.stateObjectsPending {
obj := s.stateObjects[addr]
if obj.deleted {
s.deleteStateObject(obj)
} else {
obj.updateRoot(s.db)
s.updateStateObject(obj)
}
usedAddresses = append(usedAddresses, addr)
}
if s.prefetcher != nil {
// TODO: Remove this later on, useful for debugging/testing
s.prefetcher.UseAddresses(usedAddresses)
}
if len(s.stateObjectsPending) > 0 {
s.stateObjectsPending = make(map[common.Address]struct{})
Expand Down
Loading