Skip to content

Commit

Permalink
pipeline commit trie
Browse files Browse the repository at this point in the history
  • Loading branch information
unclereal committed Dec 16, 2021
1 parent 74f6b61 commit c40b1ad
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 73 deletions.
8 changes: 5 additions & 3 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
// transition, such as amount of used gas, the receipt roots and the state root
// itself. ValidateState returns a database batch if the validation was a success
// otherwise nil and an error is returned.
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error {
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error {
header := block.Header()
if block.GasUsed() != usedGas {
return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
Expand All @@ -129,13 +129,15 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
return nil
}
},
func() error {
}
if !skipHeavyVerify {
validateFuns = append(validateFuns, func() error {
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x)", header.Root, root)
} else {
return nil
}
},
})
}
validateRes := make(chan error, len(validateFuns))
for _, f := range validateFuns {
Expand Down
5 changes: 4 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,9 @@ func (bc *BlockChain) HasFastBlock(hash common.Hash, number uint64) bool {

// HasState checks if state trie is fully present in the database or not.
func (bc *BlockChain) HasState(hash common.Hash) bool {
if s := bc.snaps.Snapshot(hash); s != nil && !s.Verified() {
return true
}
_, err := bc.stateCache.OpenTrie(hash)
return err == nil
}
Expand Down Expand Up @@ -2073,7 +2076,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
// Validate the state using the default validator
substart = time.Now()
if !statedb.IsLightProcessed() {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, true); err != nil {
log.Error("validate state failed", "error", err)
bc.reportBlock(block, receipts, err)
return it.index, err
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
blockchain.reportBlock(block, receipts, err)
return err
}
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas)
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, false)
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand Down
31 changes: 27 additions & 4 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ type diffLayer struct {
storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil
storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted)

diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer
verifiedCh chan struct{}
diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer

lock sync.RWMutex
}
Expand Down Expand Up @@ -168,7 +169,7 @@ func (h storageBloomHasher) Sum64() uint64 {

// newDiffLayer creates a new diff on top of an existing snapshot, whether that's a low
// level persistent database or a hierarchical diff already.
func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer {
func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer {
// Create the new layer with some pre-allocated data segments
dl := &diffLayer{
parent: parent,
Expand All @@ -177,6 +178,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
accountData: accounts,
storageData: storage,
storageList: make(map[common.Hash][]common.Hash),
verifiedCh: verified,
}
switch parent := parent.(type) {
case *diskLayer:
Expand Down Expand Up @@ -256,6 +258,27 @@ func (dl *diffLayer) Root() common.Hash {
return dl.root
}

// WaitVerified will wait until the diff layer been verified
func (dl *diffLayer) WaitVerified() {
if dl.verifiedCh == nil {
return
}
<-dl.verifiedCh
return
}

func (dl *diffLayer) Verified() bool {
if dl.verifiedCh == nil {
return true
}
select {
case <-dl.verifiedCh:
return true
default:
return false
}
}

// Parent returns the subsequent layer of a diff layer.
func (dl *diffLayer) Parent() snapshot {
return dl.parent
Expand Down Expand Up @@ -423,8 +446,8 @@ func (dl *diffLayer) storage(accountHash, storageHash common.Hash, depth int) ([

// Update creates a new layer on top of the existing snapshot diff tree with
// the specified data items.
func (dl *diffLayer) Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer {
return newDiffLayer(dl, blockRoot, destructs, accounts, storage)
func (dl *diffLayer) Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer {
return newDiffLayer(dl, blockRoot, destructs, accounts, storage, verified)
}

// flatten pushes all data from this point downwards, flattening everything into
Expand Down
26 changes: 13 additions & 13 deletions core/state/snapshot/difflayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,13 @@ func TestMergeDelete(t *testing.T) {
}
}
// Add some flipAccs-flopping layers on top
parent := newDiffLayer(emptyLayer(), common.Hash{}, flipDrops(), flipAccs(), storage)
child := parent.Update(common.Hash{}, flopDrops(), flopAccs(), storage)
child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage)
child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage)
child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage)
child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage)
child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage)
parent := newDiffLayer(emptyLayer(), common.Hash{}, flipDrops(), flipAccs(), storage, nil)
child := parent.Update(common.Hash{}, flopDrops(), flopAccs(), storage, nil)
child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage, nil)
child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage, nil)
child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage, nil)
child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage, nil)
child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage, nil)

if data, _ := child.Account(h1); data == nil {
t.Errorf("last diff layer: expected %x account to be non-nil", h1)
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestInsertAndMerge(t *testing.T) {
accounts = make(map[common.Hash][]byte)
storage = make(map[common.Hash]map[common.Hash][]byte)
)
parent = newDiffLayer(emptyLayer(), common.Hash{}, destructs, accounts, storage)
parent = newDiffLayer(emptyLayer(), common.Hash{}, destructs, accounts, storage, nil)
}
{
var (
Expand All @@ -220,7 +220,7 @@ func TestInsertAndMerge(t *testing.T) {
accounts[acc] = randomAccount()
storage[acc] = make(map[common.Hash][]byte)
storage[acc][slot] = []byte{0x01}
child = newDiffLayer(parent, common.Hash{}, destructs, accounts, storage)
child = newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil)
}
// And flatten
merged := (child.flatten()).(*diffLayer)
Expand Down Expand Up @@ -256,7 +256,7 @@ func BenchmarkSearch(b *testing.B) {
for i := 0; i < 10000; i++ {
accounts[randomHash()] = randomAccount()
}
return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage)
return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil)
}
var layer snapshot
layer = emptyLayer()
Expand Down Expand Up @@ -298,7 +298,7 @@ func BenchmarkSearchSlot(b *testing.B) {
accStorage[randomHash()] = value
storage[accountKey] = accStorage
}
return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage)
return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil)
}
var layer snapshot
layer = emptyLayer()
Expand Down Expand Up @@ -336,7 +336,7 @@ func BenchmarkFlatten(b *testing.B) {
}
storage[accountKey] = accStorage
}
return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage)
return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -386,7 +386,7 @@ func BenchmarkJournal(b *testing.B) {
}
storage[accountKey] = accStorage
}
return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage)
return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil)
}
layer := snapshot(new(diskLayer))
for i := 1; i < 128; i++ {
Expand Down
12 changes: 10 additions & 2 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func (dl *diskLayer) Root() common.Hash {
return dl.root
}

func (dl *diskLayer) WaitVerified() {
return
}

func (dl *diskLayer) Verified() bool {
return true
}

// Parent always returns nil as there's no layer below the disk.
func (dl *diskLayer) Parent() snapshot {
return nil
Expand Down Expand Up @@ -161,6 +169,6 @@ func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) ([]byte, erro
// Update creates a new layer on top of the existing snapshot diff tree with
// the specified data items. Note, the maps are retained by the method to avoid
// copying everything.
func (dl *diskLayer) Update(blockHash common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer {
return newDiffLayer(dl, blockHash, destructs, accounts, storage)
func (dl *diskLayer) Update(blockHash common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer {
return newDiffLayer(dl, blockHash, destructs, accounts, storage, verified)
}
4 changes: 2 additions & 2 deletions core/state/snapshot/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestAccountIteratorBasics(t *testing.T) {
}
}
// Add some (identical) layers on top
diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage))
diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil)
it := diffLayer.AccountIterator(common.Hash{})
verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator

Expand Down Expand Up @@ -91,7 +91,7 @@ func TestStorageIteratorBasics(t *testing.T) {
nilStorage[h] = nilstorage
}
// Add some (identical) layers on top
diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, nil, copyAccounts(accounts), copyStorage(storage))
diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, nil, copyAccounts(accounts), copyStorage(storage), nil)
for account := range accounts {
it, _ := diffLayer.StorageIterator(account, common.Hash{})
verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
Expand Down
2 changes: 1 addition & 1 deletion core/state/snapshot/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) {
}
storageData[entry.Hash] = slots
}
return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData), r)
return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData, nil), r)
}

// Journal terminates any in-progress snapshot generation, also implicitly pushing
Expand Down
24 changes: 19 additions & 5 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type Snapshot interface {
// Root returns the root hash for which this snapshot was made.
Root() common.Hash

// WaitVerified will wait until the Snapshot been verified
WaitVerified()

// Verified returns whether the snapshot is verified
Verified() bool

// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
Account(hash common.Hash) (*Account, error)
Expand Down Expand Up @@ -130,7 +136,7 @@ type snapshot interface {
// the specified data items.
//
// Note, the maps are retained by the method to avoid copying everything.
Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer
Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer

// Journal commits an entire diff hierarchy to disk into a single journal entry.
// This is meant to be used during shutdown to persist the snapshot without
Expand Down Expand Up @@ -289,6 +295,14 @@ func (t *Tree) Snapshot(blockRoot common.Hash) Snapshot {
return t.layers[blockRoot]
}

// Snapshot retrieves a snapshot belonging to the given block root, or nil if no
func (t *Tree) Remove(blockRoot common.Hash) {
t.lock.RLock()
defer t.lock.RUnlock()

delete(t.layers, blockRoot)
}

// Snapshots returns all visited layers from the topmost layer with specific
// root and traverses downward. The layer amount is limited by the given number.
// If nodisk is set, then disk layer is excluded.
Expand Down Expand Up @@ -322,14 +336,14 @@ func (t *Tree) Snapshots(root common.Hash, limits int, nodisk bool) []Snapshot {
return ret
}

func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Address]struct{}, accounts map[common.Address][]byte, storage map[common.Address]map[string][]byte) error {
func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Address]struct{}, accounts map[common.Address][]byte, storage map[common.Address]map[string][]byte, verified chan struct{}) error {
hashDestructs, hashAccounts, hashStorage := transformSnapData(destructs, accounts, storage)
return t.update(blockRoot, parentRoot, hashDestructs, hashAccounts, hashStorage)
return t.update(blockRoot, parentRoot, hashDestructs, hashAccounts, hashStorage, verified)
}

// Update adds a new snapshot into the tree, if that can be linked to an existing
// old parent. It is disallowed to insert a disk layer (the origin of all).
func (t *Tree) update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) error {
func (t *Tree) update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) error {
// Reject noop updates to avoid self-loops in the snapshot tree. This is a
// special case that can only happen for Clique networks where empty blocks
// don't modify the state (0 block subsidy).
Expand All @@ -344,7 +358,7 @@ func (t *Tree) update(blockRoot common.Hash, parentRoot common.Hash, destructs m
if parent == nil {
return fmt.Errorf("parent [%#x] snapshot missing", parentRoot)
}
snap := parent.(snapshot).Update(blockRoot, destructs, accounts, storage)
snap := parent.(snapshot).Update(blockRoot, destructs, accounts, storage, verified)

// Save the new snapshot for later
t.lock.Lock()
Expand Down
Loading

0 comments on commit c40b1ad

Please sign in to comment.