Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat batch split #223

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 84 additions & 5 deletions triedb/pathdb/nodebufferlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
nf.traverseReverse(commitFunc)

persistID := nf.persistID + nf.base.layers
err := nf.base.flush(nf.db, nf.clean, persistID)
err := nf.base.flush(nf.db, nf.clean, persistID, false)
if err != nil {
log.Crit("failed to flush base node buffer to disk", "error", err)
}
Expand Down Expand Up @@ -742,13 +742,13 @@ func (nf *nodebufferlist) diffToBase() {
}

// backgroundFlush flush base node buffer to disk.
func (nf *nodebufferlist) backgroundFlush() {
func (nf *nodebufferlist) backgroundFlush(splitBatches bool) {
nf.flushMux.Lock()
defer nf.flushMux.Unlock()
nf.baseMux.RLock()
persistID := nf.persistID + nf.base.layers
nf.baseMux.RUnlock()
err := nf.base.flush(nf.db, nf.clean, persistID)
err := nf.base.flush(nf.db, nf.clean, persistID, splitBatches)
if err != nil {
log.Error("failed to flush base node buffer to disk", "error", err)
return
Expand Down Expand Up @@ -817,7 +817,11 @@ func (nf *nodebufferlist) loop() {
}
nf.diffToBase()
if nf.base.size >= nf.base.limit {
nf.backgroundFlush()
if nf.base.size >= 4*1024*1024*1024 {
nf.backgroundFlush(true)
} else {
nf.backgroundFlush(false)
}
}
nf.isFlushing.Swap(false)
}
Expand Down Expand Up @@ -1084,7 +1088,7 @@ func (mf *multiDifflayer) empty() bool {

// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, splitBatches bool) error {
// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+mf.layers != id {
Expand All @@ -1094,6 +1098,55 @@ func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
start = time.Now()
batch = db.NewBatchWithSize(int(float64(mf.size) * DefaultBatchRedundancyRate))
)

//start
if splitBatches {
totalKeys := len(mf.nodes)
if totalKeys == 0 {
return nil
}

log.Info("total keys", "total num", totalKeys)
const numBatches = 3

batchKeyLimit := totalKeys / numBatches
if totalKeys%numBatches != 0 {
batchKeyLimit++
}

totalNodes := 0

for len(mf.nodes) > 0 {
batch := db.NewBatch()

writtenKeys, err := writeNodesInBatchByKeys(batch, mf.nodes, clean, batchKeyLimit)
if err != nil {
return err
}

totalNodes += len(writtenKeys)

rawdb.WritePersistentStateID(batch, id)

if err := batch.Write(); err != nil {
return err
}

removeWrittenKeys(mf.nodes, writtenKeys)

if len(writtenKeys) == 0 {
log.Info("success")
break
}
}

commitTimeTimer.UpdateSince(start)
log.Info("Persisted pathdb nodes in batches", "nodes", totalNodes, "state_id", id, "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}

//end

nodes := writeNodes(batch, mf.nodes, clean)
rawdb.WritePersistentStateID(batch, id)

Expand Down Expand Up @@ -1160,3 +1213,29 @@ func (mf *multiDifflayer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]
mf.updateSize(delta)
return nil
}

func writeNodesInBatchByKeys(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.Node, clean *fastcache.Cache, batchKeyLimit int) ([]common.Hash, error) {
writtenKeys := make([]common.Hash, 0, batchKeyLimit)

for hash, nodeMap := range nodes {
written := writeNodes(batch, map[common.Hash]map[string]*trienode.Node{hash: nodeMap}, clean)
if written == 0 {
return nil, fmt.Errorf("failed to write node for key: %v", hash)
}

writtenKeys = append(writtenKeys, hash)

if len(writtenKeys) >= batchKeyLimit {
log.Info("writtenKeys access limit")
break
}
}

return writtenKeys, nil
}

func removeWrittenKeys(nodes map[common.Hash]map[string]*trienode.Node, writtenKeys []common.Hash) {
for _, key := range writtenKeys {
delete(nodes, key)
}
}
Loading