Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
// The onleaf func is called _serially_, so we can reuse the same account
// for unmarshalling every time.
var account Account
root, err := s.trie.Commit(func(leaf []byte, parent common.Hash) error {
root, err := s.trie.Commit(func(path []byte, leaf []byte, parent common.Hash) error {
if err := rlp.DecodeBytes(leaf, &account); err != nil {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions core/state/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ import (
// NewStateSync create a new state trie download scheduler.
func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.SyncBloom) *trie.Sync {
var syncer *trie.Sync
callback := func(leaf []byte, parent common.Hash) error {
callback := func(path []byte, leaf []byte, parent common.Hash) error {
var obj Account
if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil {
return err
}
syncer.AddSubTrie(obj.Root, 64, parent, nil)
syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), 64, parent)
syncer.AddSubTrie(obj.Root, path, parent, nil)
syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), path, parent)
return nil
}
syncer = trie.NewSync(root, database, callback, bloom)
Expand Down
11 changes: 8 additions & 3 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1611,7 +1611,13 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
// Start syncing state of the reported head block. This should get us most of
// the state of the pivot block.
sync := d.syncState(latest.Root)
defer sync.Cancel()
defer func() {
// The `sync` object is replaced every time the pivot moves. We need to
// defer close the very last active one, hence the lazy evaluation vs.
// calling defer sync.Cancel() !!!
sync.Cancel()
}()

closeOnErr := func(s *stateSync) {
if err := s.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled {
d.queue.Close() // wake up Results
Expand Down Expand Up @@ -1674,9 +1680,8 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
// If new pivot block found, cancel old state retrieval and restart
if oldPivot != P {
sync.Cancel()

sync = d.syncState(P.Header.Root)
defer sync.Cancel()

go closeOnErr(sync)
oldPivot = P
}
Expand Down
4 changes: 2 additions & 2 deletions trie/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,12 @@ func (c *committer) commitLoop(db *Database) {
switch n := n.(type) {
case *shortNode:
if child, ok := n.Val.(valueNode); ok {
c.onleaf(child, hash)
c.onleaf(nil, child, hash)
}
case *fullNode:
for i := 0; i < 16; i++ {
if child, ok := n.Children[i].(valueNode); ok {
c.onleaf(child, hash)
c.onleaf(nil, child, hash)
}
}
}
Expand Down
60 changes: 42 additions & 18 deletions trie/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,19 @@ var ErrNotRequested = errors.New("not requested")
// node it already processed previously.
var ErrAlreadyProcessed = errors.New("already processed")

// maxFetchesPerDepth is the maximum number of pending trie nodes per depth. The
// role of this value is to limit the number of trie nodes that get expanded in
// memory if the node was configured with a significant number of peers.
const maxFetchesPerDepth = 16384

// request represents a scheduled or already in-flight state retrieval request.
type request struct {
path []byte // Merkle path leading to this node for prioritization
hash common.Hash // Hash of the node data content to retrieve
data []byte // Data content of the node, cached until all subtrees complete
code bool // Whether this is a code entry

parents []*request // Parent state nodes referencing this entry (notify all upon completion)
depth int // Depth level within the trie the node is located to prioritise DFS
deps int // Number of dependencies before allowed to commit this node

callback LeafCallback // Callback to invoke if a leaf node it reached on this branch
Expand Down Expand Up @@ -89,6 +94,7 @@ type Sync struct {
nodeReqs map[common.Hash]*request // Pending requests pertaining to a trie node hash
codeReqs map[common.Hash]*request // Pending requests pertaining to a code hash
queue *prque.Prque // Priority queue with the pending requests
fetches map[int]int // Number of active fetches per trie node depth
bloom *SyncBloom // Bloom filter for fast state existence checks
}

Expand All @@ -100,14 +106,15 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
nodeReqs: make(map[common.Hash]*request),
codeReqs: make(map[common.Hash]*request),
queue: prque.New(nil),
fetches: make(map[int]int),
bloom: bloom,
}
ts.AddSubTrie(root, 0, common.Hash{}, callback)
ts.AddSubTrie(root, nil, common.Hash{}, callback)
return ts
}

// AddSubTrie registers a new trie to the sync code, rooted at the designated parent.
func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback LeafCallback) {
func (s *Sync) AddSubTrie(root common.Hash, path []byte, parent common.Hash, callback LeafCallback) {
// Short circuit if the trie is empty or already known
if root == emptyRoot {
return
Expand All @@ -128,8 +135,8 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb
}
// Assemble the new sub-trie sync request
req := &request{
path: path,
hash: root,
depth: depth,
callback: callback,
}
// If this sub-trie has a designated parent, link them together
Expand All @@ -147,7 +154,7 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb
// AddCodeEntry schedules the direct retrieval of a contract code that should not
// be interpreted as a trie node, but rather accepted and stored into the database
// as is.
func (s *Sync) AddCodeEntry(hash common.Hash, depth int, parent common.Hash) {
func (s *Sync) AddCodeEntry(hash common.Hash, path []byte, parent common.Hash) {
// Short circuit if the entry is empty or already known
if hash == emptyState {
return
Expand All @@ -170,9 +177,9 @@ func (s *Sync) AddCodeEntry(hash common.Hash, depth int, parent common.Hash) {
}
// Assemble the new sub-trie sync request
req := &request{
hash: hash,
code: true,
depth: depth,
path: path,
hash: hash,
code: true,
}
// If this sub-trie has a designated parent, link them together
if parent != (common.Hash{}) {
Expand All @@ -190,7 +197,18 @@ func (s *Sync) AddCodeEntry(hash common.Hash, depth int, parent common.Hash) {
func (s *Sync) Missing(max int) []common.Hash {
var requests []common.Hash
for !s.queue.Empty() && (max == 0 || len(requests) < max) {
requests = append(requests, s.queue.PopItem().(common.Hash))
// Retrieve th enext item in line
item, prio := s.queue.Peek()

// If we have too many already-pending tasks for this depth, throttle
depth := int(prio >> 56)
if s.fetches[depth] > maxFetchesPerDepth {
break
}
// Item is allowed to be scheduled, add it to the task list
s.queue.Pop()
s.fetches[depth]++
requests = append(requests, item.(common.Hash))
}
return requests
}
Expand Down Expand Up @@ -285,31 +303,35 @@ func (s *Sync) schedule(req *request) {
// is a trie node and code has same hash. In this case two elements
// with same hash and same or different depth will be pushed. But it's
// ok the worst case is the second response will be treated as duplicated.
s.queue.Push(req.hash, int64(req.depth))
prio := int64(len(req.path)) << 56 // depth >= 128 will never happen, storage leaves will be included in their parents
for i := 0; i < 14 && i < len(req.path); i++ {
prio |= int64(15-req.path[i]) << (52 - i*4) // 15-nibble => lexicographic order
}
s.queue.Push(req.hash, prio)
Comment on lines +306 to +310
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote a little playground gist to check how this worked: https://play.golang.org/p/QKezpVe3ZX7 . Might be worth taking the gist and making a test out of it, to verify that paths are indeed prioritized correctly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to postpone adding this into a followup PR. I want to expose the path and code/trie node separation into the Missing and that will make this test a lot simpler. Otherwise we'd need to mess around with hard coding hashes into the tester, which we could definitely do, but it will be kind of like a black magic test with random values nobody knows where they originate from.

}

// children retrieves all the missing children of a state trie entry for future
// retrieval scheduling.
func (s *Sync) children(req *request, object node) ([]*request, error) {
// Gather all the children of the node, irrelevant whether known or not
type child struct {
node node
depth int
path []byte
node node
}
var children []child

switch node := (object).(type) {
case *shortNode:
children = []child{{
node: node.Val,
depth: req.depth + len(node.Key),
node: node.Val,
path: append(append([]byte(nil), req.path...), node.Key...),
}}
case *fullNode:
for i := 0; i < 17; i++ {
if node.Children[i] != nil {
children = append(children, child{
node: node.Children[i],
depth: req.depth + 1,
node: node.Children[i],
path: append(append([]byte(nil), req.path...), byte(i)),
})
}
}
Expand All @@ -322,7 +344,7 @@ func (s *Sync) children(req *request, object node) ([]*request, error) {
// Notify any external watcher of a new key/value node
if req.callback != nil {
if node, ok := (child.node).(valueNode); ok {
if err := req.callback(node, req.hash); err != nil {
if err := req.callback(req.path, node, req.hash); err != nil {
return nil, err
}
}
Expand All @@ -346,9 +368,9 @@ func (s *Sync) children(req *request, object node) ([]*request, error) {
}
// Locally unknown node, schedule for retrieval
requests = append(requests, &request{
path: child.path,
hash: hash,
parents: []*request{req},
depth: child.depth,
callback: req.callback,
})
}
Expand All @@ -364,9 +386,11 @@ func (s *Sync) commit(req *request) (err error) {
if req.code {
s.membatch.codes[req.hash] = req.data
delete(s.codeReqs, req.hash)
s.fetches[len(req.path)]--
} else {
s.membatch.nodes[req.hash] = req.data
delete(s.nodeReqs, req.hash)
s.fetches[len(req.path)]--
}
// Check all parents for completion
for _, parent := range req.parents {
Expand Down
2 changes: 1 addition & 1 deletion trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
// LeafCallback is a callback type invoked when a trie operation reaches a leaf
// node. It's used by state sync and commit to allow handling external references
// between account and storage tries.
type LeafCallback func(leaf []byte, parent common.Hash) error
type LeafCallback func(path []byte, leaf []byte, parent common.Hash) error

// Trie is a Merkle Patricia Trie.
// The zero value is an empty trie with no database.
Expand Down
2 changes: 1 addition & 1 deletion trie/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func BenchmarkCommitAfterHash(b *testing.B) {
benchmarkCommitAfterHash(b, nil)
})
var a account
onleaf := func(leaf []byte, parent common.Hash) error {
onleaf := func(path []byte, leaf []byte, parent common.Hash) error {
rlp.DecodeBytes(leaf, &a)
return nil
}
Expand Down