Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trie: parallel commit (alternative version) #30545

Merged
merged 15 commits into from
Oct 14, 2024
Next Next commit
trie: parallize committer
  • Loading branch information
stevemilk authored and holiman committed Oct 7, 2024
commit 633d3762897eeca01b4102db47d47ccc5e512dfd
4 changes: 2 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func (s *StateDB) fastDeleteStorage(snaps *snapshot.Tree, addrHash common.Hash,
slots = make(map[common.Hash][]byte)
)
stack := trie.NewStackTrie(func(path []byte, hash common.Hash, blob []byte) {
nodes.AddNode(path, trienode.NewDeleted())
nodes.AddNode(string(path), trienode.NewDeleted())
})
for iter.Next() {
slot := common.CopyBytes(iter.Slot())
Expand Down Expand Up @@ -991,7 +991,7 @@ func (s *StateDB) slowDeleteStorage(addr common.Address, addrHash common.Hash, r
if it.Hash() == (common.Hash{}) {
continue
}
nodes.AddNode(it.Path(), trienode.NewDeleted())
nodes.AddNode(string(it.Path()), trienode.NewDeleted())
}
if err := it.Error(); err != nil {
return nil, nil, err
Expand Down
128 changes: 103 additions & 25 deletions trie/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package trie

import (
"fmt"
"runtime"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/trie/trienode"
Expand All @@ -30,28 +32,44 @@ type committer struct {
nodes *trienode.NodeSet
tracer *tracer
collectLeaf bool
parallel bool
}

// newCommitter creates a new committer or picks one from the pool.
func newCommitter(nodeset *trienode.NodeSet, tracer *tracer, collectLeaf bool) *committer {
func newCommitter(nodes *trienode.NodeSet, tracer *tracer, collectLeaf bool, parallel bool) *committer {
return &committer{
nodes: nodeset,
nodes: nodes,
tracer: tracer,
collectLeaf: collectLeaf,
parallel: parallel,
}
}

type wrapNode struct {
node *trienode.Node
path string
leafHash common.Hash // optional, the parent hash of the relative leaf
leafBlob []byte // optional, the blob of the relative leaf
}

// Commit collapses a node down into a hash node.
func (c *committer) Commit(n node) hashNode {
return c.commit(nil, n).(hashNode)
hn, wnodes := c.commit(nil, n, true)
for _, wn := range wnodes {
c.nodes.AddNode(wn.path, wn.node)
if wn.leafHash != (common.Hash{}) {
c.nodes.AddLeaf(wn.leafHash, wn.leafBlob)
}
}
return hn.(hashNode)
}

// commit collapses a node down into a hash node and returns it.
func (c *committer) commit(path []byte, n node) node {
func (c *committer) commit(path []byte, n node, topmost bool) (node, []*wrapNode) {
// if this path is clean, use available cached data
hash, dirty := n.cache()
if hash != nil && !dirty {
return hash
return hash, nil
}
// Commit children, then parent, and remove the dirty flag.
switch cn := n.(type) {
Expand All @@ -61,38 +79,72 @@ func (c *committer) commit(path []byte, n node) node {

// If the child is fullNode, recursively commit,
// otherwise it can only be hashNode or valueNode.
var nodes []*wrapNode
if _, ok := cn.Val.(*fullNode); ok {
collapsed.Val = c.commit(append(path, cn.Key...), cn.Val)
collapsed.Val, nodes = c.commit(append(path, cn.Key...), cn.Val, false)
}
// The key needs to be copied, since we're adding it to the
// modified nodeset.
collapsed.Key = hexToCompact(cn.Key)
hashedNode := c.store(path, collapsed)
hashedNode, wNode := c.store(path, collapsed)
if wNode != nil {
nodes = append(nodes, wNode)
}
if hn, ok := hashedNode.(hashNode); ok {
return hn
return hn, nodes
}
return collapsed
return collapsed, nodes
case *fullNode:
hashedKids := c.commitChildren(path, cn)
hashedKids, nodes := c.commitChildren(path, cn, topmost && c.parallel)
collapsed := cn.copy()
collapsed.Children = hashedKids

hashedNode := c.store(path, collapsed)
hashedNode, wNode := c.store(path, collapsed)
if wNode != nil {
nodes = append(nodes, wNode)
}
if hn, ok := hashedNode.(hashNode); ok {
return hn
return hn, nodes
}
return collapsed
return collapsed, nodes
case hashNode:
return cn
return cn, nil
default:
// nil, valuenode shouldn't be committed
panic(fmt.Sprintf("%T: invalid node: %v", n, n))
}
}

type task struct {
node node
index int
path []byte
}

// commitChildren commits the children of the given fullnode
func (c *committer) commitChildren(path []byte, n *fullNode) [17]node {
var children [17]node
func (c *committer) commitChildren(path []byte, n *fullNode, parallel bool) ([17]node, []*wrapNode) {
var (
wg sync.WaitGroup
children [17]node
results [16][]*wrapNode
tasks = make(chan task)
)
if parallel {
worker := func() {
defer wg.Done()
for t := range tasks {
children[t.index], results[t.index] = c.commit(t.path, t.node, false)
}
}
threads := runtime.NumCPU()
if threads > 16 {
threads = 16
}
for i := 0; i < threads; i++ {
wg.Add(1)
go worker()
}
}
for i := 0; i < 16; i++ {
child := n.Children[i]
if child == nil {
Expand All @@ -108,18 +160,36 @@ func (c *committer) commitChildren(path []byte, n *fullNode) [17]node {
// Commit the child recursively and store the "hashed" value.
// Note the returned node can be some embedded nodes, so it's
// possible the type is not hashNode.
children[i] = c.commit(append(path, byte(i)), child)
if !parallel {
children[i], results[i] = c.commit(append(path, byte(i)), child, false)
} else {
tasks <- task{
index: i,
node: child,
path: append(path, byte(i)),
}
}
}
if parallel {
close(tasks)
wg.Wait()
}
// For the 17th child, it's possible the type is valuenode.
if n.Children[16] != nil {
children[16] = n.Children[16]
}
return children
var wnodes []*wrapNode
for i := 0; i < 16; i++ {
if results[i] != nil {
wnodes = append(wnodes, results[i]...)
}
}
return children, wnodes
}

// store hashes the node n and adds it to the modified nodeset. If leaf collection
// is enabled, leaf nodes will be tracked in the modified nodeset as well.
func (c *committer) store(path []byte, n node) node {
func (c *committer) store(path []byte, n node) (node, *wrapNode) {
// Larger nodes are replaced by their hash and stored in the database.
var hash, _ = n.cache()

Expand All @@ -133,25 +203,33 @@ func (c *committer) store(path []byte, n node) node {
// deleted only if the node was existent in database before.
_, ok := c.tracer.accessList[string(path)]
if ok {
c.nodes.AddNode(path, trienode.NewDeleted())
return n, &wrapNode{
path: string(path),
node: trienode.NewDeleted(),
}
}
return n
return n, nil
}
// Collect the dirty node to nodeset for return.
nhash := common.BytesToHash(hash)
c.nodes.AddNode(path, trienode.New(nhash, nodeToBytes(n)))
wNode := &wrapNode{
path: string(path),
node: trienode.New(nhash, nodeToBytes(n)),
}

// Collect the corresponding leaf node if it's required. We don't check
// full node since it's impossible to store value in fullNode. The key
// length of leaves should be exactly same.
// length of leaves should be exactly same..
if c.collectLeaf {
if sn, ok := n.(*shortNode); ok {
if val, ok := sn.Val.(valueNode); ok {
c.nodes.AddLeaf(nhash, val)
wNode.leafHash = nhash
wNode.leafBlob = val
}
}
}
return hash

return hash, wNode
}

// ForGatherChildren decodes the provided node and traverses the children inside.
Expand Down
33 changes: 19 additions & 14 deletions trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,17 @@ type Trie struct {
// trie is not usable(latest states is invisible).
committed bool

// Keep track of the number leaves which have been inserted since the last
// hashing operation. This number will not directly map to the number of
// actually unhashed nodes.
unhashed int

// reader is the handler trie can retrieve nodes from.
reader *trieReader

// tracer is the tool to track the trie changes.
tracer *tracer

// The number of trie mutations that have been performed
mutate int

// The number of mutations that have been hashed
hashed int
}

// newFlag returns the cache flag value for a newly created node.
Expand All @@ -67,9 +68,10 @@ func (t *Trie) Copy() *Trie {
root: t.root,
owner: t.owner,
committed: t.committed,
unhashed: t.unhashed,
reader: t.reader,
tracer: t.tracer.copy(),
mutate: t.mutate,
hashed: t.hashed,
}
}

Expand Down Expand Up @@ -304,11 +306,12 @@ func (t *Trie) Update(key, value []byte) error {
if t.committed {
return ErrCommitted
}
t.mutate++
return t.update(key, value)
}

func (t *Trie) update(key, value []byte) error {
t.unhashed++
t.mutate++
k := keybytesToHex(key)
if len(value) != 0 {
_, n, err := t.insert(t.root, nil, k, valueNode(value))
Expand Down Expand Up @@ -422,7 +425,7 @@ func (t *Trie) Delete(key []byte) error {
if t.committed {
return ErrCommitted
}
t.unhashed++
t.mutate++
k := keybytesToHex(key)
_, n, err := t.delete(t.root, nil, k)
if err != nil {
Expand Down Expand Up @@ -622,7 +625,7 @@ func (t *Trie) Commit(collectLeaf bool) (common.Hash, *trienode.NodeSet) {
}
nodes := trienode.NewNodeSet(t.owner)
for _, path := range paths {
nodes.AddNode([]byte(path), trienode.NewDeleted())
nodes.AddNode(path, trienode.NewDeleted())
}
return types.EmptyRootHash, nodes // case (b)
}
Expand All @@ -640,9 +643,10 @@ func (t *Trie) Commit(collectLeaf bool) (common.Hash, *trienode.NodeSet) {
}
nodes := trienode.NewNodeSet(t.owner)
for _, path := range t.tracer.deletedNodes() {
nodes.AddNode([]byte(path), trienode.NewDeleted())
nodes.AddNode(path, trienode.NewDeleted())
}
t.root = newCommitter(nodes, t.tracer, collectLeaf).Commit(t.root)
t.root = newCommitter(nodes, t.tracer, collectLeaf, t.mutate > 100).Commit(t.root)
t.mutate = 0
return rootHash, nodes
}

Expand All @@ -652,10 +656,10 @@ func (t *Trie) hashRoot() (node, node) {
return hashNode(types.EmptyRootHash.Bytes()), nil
}
// If the number of changes is below 100, we let one thread handle it
h := newHasher(t.unhashed >= 100)
h := newHasher(t.mutate-t.hashed >= 100)
defer func() {
returnHasherToPool(h)
t.unhashed = 0
t.hashed = t.mutate
}()
hashed, cached := h.hash(t.root, true)
return hashed, cached
Expand All @@ -677,7 +681,8 @@ func (t *Trie) Witness() map[string]struct{} {
func (t *Trie) Reset() {
t.root = nil
t.owner = common.Hash{}
t.unhashed = 0
t.tracer.reset()
t.committed = false
t.hashed = 0
t.mutate = 0
}
4 changes: 2 additions & 2 deletions trie/trienode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ func (set *NodeSet) ForEachWithOrder(callback func(path string, n *Node)) {
}

// AddNode adds the provided node into set.
func (set *NodeSet) AddNode(path []byte, n *Node) {
func (set *NodeSet) AddNode(path string, n *Node) {
if n.IsDeleted() {
set.deletes += 1
} else {
set.updates += 1
}
set.Nodes[string(path)] = n
set.Nodes[path] = n
}

// Merge adds a set of nodes into the set.
Expand Down
2 changes: 1 addition & 1 deletion trie/trienode/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func benchmarkMerge(b *testing.B, count int) {
blob := make([]byte, 32)
rand.Read(blob)
hash := crypto.Keccak256Hash(blob)
s.AddNode(path, New(hash, blob))
s.AddNode(string(path), New(hash, blob))
}
for i := 0; i < count; i++ {
// Random path of 4 nibbles
Expand Down
2 changes: 1 addition & 1 deletion trie/verkle.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (t *VerkleTrie) Commit(_ bool) (common.Hash, *trienode.NodeSet) {
nodeset := trienode.NewNodeSet(common.Hash{})
for _, node := range nodes {
// Hash parameter is not used in pathdb
nodeset.AddNode(node.Path, trienode.New(common.Hash{}, node.SerializedBytes))
nodeset.AddNode(string(node.Path), trienode.New(common.Hash{}, node.SerializedBytes))
}
// Serialize root commitment form
return t.Hash(), nodeset
Expand Down