Skip to content

Commit b034ce3

Browse files
rjl493456442sadoci
authored andcommitted
core/state/snapshot: clean up the generation code (ethereum#24479)
1 parent 7ad22eb commit b034ce3

File tree

1 file changed

+137
-98
lines changed

1 file changed

+137
-98
lines changed

core/state/snapshot/generate.go

Lines changed: 137 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix
379379
type onStateCallback func(key []byte, val []byte, write bool, delete bool) error
380380

381381
// generateRange generates the state segment with particular prefix. Generation can
382-
// either verify the correctness of existing state through rangeproof and skip
382+
// either verify the correctness of existing state through range-proof and skip
383383
// generation, or iterate trie to regenerate state on demand.
384384
func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, origin []byte, max int, stats *generatorStats, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) {
385385
// Use range prover to check the validity of the flat state in the range
@@ -532,66 +532,94 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string,
532532
return !trieMore && !result.diskMore, last, nil
533533
}
534534

535-
// generate is a background thread that iterates over the state and storage tries,
536-
// constructing the state snapshot. All the arguments are purely for statistics
537-
// gathering and logging, since the method surfs the blocks as they arrive, often
538-
// being restarted.
539-
func (dl *diskLayer) generate(stats *generatorStats) {
540-
var (
541-
accMarker []byte
542-
accountRange = accountCheckRange
543-
)
544-
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
545-
// Always reset the initial account range as 1
546-
// whenever recover from the interruption.
547-
accMarker, accountRange = dl.genMarker[:common.HashLength], 1
535+
// checkAndFlush checks if an interruption signal is received or the
536+
// batch size has exceeded the allowance.
537+
func (dl *diskLayer) checkAndFlush(current []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
538+
var abort chan *generatorStats
539+
select {
540+
case abort = <-dl.genAbort:
541+
default:
548542
}
549-
var (
550-
batch = dl.diskdb.NewBatch()
551-
logged = time.Now()
552-
accOrigin = common.CopyBytes(accMarker)
553-
abort chan *generatorStats
554-
)
555-
stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker)
543+
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
544+
if bytes.Compare(current, dl.genMarker) < 0 {
545+
log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", dl.genMarker))
546+
}
547+
// Flush out the batch anyway no matter it's empty or not.
548+
// It's possible that all the states are recovered and the
549+
// generation indeed makes progress.
550+
journalProgress(batch, current, stats)
556551

557-
checkAndFlush := func(currentLocation []byte) error {
558-
select {
559-
case abort = <-dl.genAbort:
560-
default:
552+
if err := batch.Write(); err != nil {
553+
return err
561554
}
562-
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
563-
if bytes.Compare(currentLocation, dl.genMarker) < 0 {
564-
log.Error("Snapshot generator went backwards",
565-
"currentLocation", fmt.Sprintf("%x", currentLocation),
566-
"genMarker", fmt.Sprintf("%x", dl.genMarker))
567-
}
555+
batch.Reset()
568556

569-
// Flush out the batch anyway no matter it's empty or not.
570-
// It's possible that all the states are recovered and the
571-
// generation indeed makes progress.
572-
journalProgress(batch, currentLocation, stats)
557+
dl.lock.Lock()
558+
dl.genMarker = current
559+
dl.lock.Unlock()
573560

574-
if err := batch.Write(); err != nil {
575-
return err
576-
}
577-
batch.Reset()
561+
if abort != nil {
562+
stats.Log("Aborting state snapshot generation", dl.root, current)
563+
return newAbortErr(abort) // bubble up an error for interruption
564+
}
565+
}
566+
if time.Since(*logged) > 8*time.Second {
567+
stats.Log("Generating state snapshot", dl.root, current)
568+
*logged = time.Now()
569+
}
570+
return nil
571+
}
578572

579-
dl.lock.Lock()
580-
dl.genMarker = currentLocation
581-
dl.lock.Unlock()
573+
// generateStorages generates the missing storage slots of the specific contract.
574+
// It's supposed to restart the generation from the given origin position.
575+
func generateStorages(dl *diskLayer, account common.Hash, storageRoot common.Hash, storeMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
576+
onStorage := func(key []byte, val []byte, write bool, delete bool) error {
577+
defer func(start time.Time) {
578+
snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds())
579+
}(time.Now())
582580

583-
if abort != nil {
584-
stats.Log("Aborting state snapshot generation", dl.root, currentLocation)
585-
return errors.New("aborted")
586-
}
581+
if delete {
582+
rawdb.DeleteStorageSnapshot(batch, account, common.BytesToHash(key))
583+
snapWipedStorageMeter.Mark(1)
584+
return nil
585+
}
586+
if write {
587+
rawdb.WriteStorageSnapshot(batch, account, common.BytesToHash(key), val)
588+
snapGeneratedStorageMeter.Mark(1)
589+
} else {
590+
snapRecoveredStorageMeter.Mark(1)
587591
}
588-
if time.Since(logged) > 8*time.Second {
589-
stats.Log("Generating state snapshot", dl.root, currentLocation)
590-
logged = time.Now()
592+
stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
593+
stats.slots++
594+
595+
// If we've exceeded our batch allowance or termination was requested, flush to disk
596+
if err := dl.checkAndFlush(append(account[:], key...), batch, stats, logged); err != nil {
597+
return err
591598
}
592599
return nil
593600
}
601+
// Loop for re-generating the missing storage slots.
602+
var origin = common.CopyBytes(storeMarker)
603+
for {
604+
exhausted, last, err := dl.generateRange(storageRoot, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), "storage", origin, storageCheckRange, stats, onStorage, nil)
605+
if err != nil {
606+
return err // The procedure it aborted, either by external signal or internal error.
607+
}
608+
// Abort the procedure if the entire contract storage is generated
609+
if exhausted {
610+
break
611+
}
612+
if origin = increaseKey(last); origin == nil {
613+
break // special case, the last is 0xffffffff...fff
614+
}
615+
}
616+
return nil
617+
}
594618

619+
// generateAccounts generates the missing snapshot accounts as well as their
620+
// storage slots in the main trie. It's supposed to restart the generation
621+
// from the given origin position.
622+
func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
595623
onAccount := func(key []byte, val []byte, write bool, delete bool) error {
596624
var (
597625
start = time.Now()
@@ -647,7 +675,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
647675
marker = dl.genMarker[:]
648676
}
649677
// If we've exceeded our batch allowance or termination was requested, flush to disk
650-
if err := checkAndFlush(marker); err != nil {
678+
if err := dl.checkAndFlush(marker, batch, stats, logged); err != nil {
651679
return err
652680
}
653681
// If the iterated account is the contract, create a further loop to
@@ -671,70 +699,67 @@ func (dl *diskLayer) generate(stats *generatorStats) {
671699
if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength {
672700
storeMarker = dl.genMarker[common.HashLength:]
673701
}
674-
onStorage := func(key []byte, val []byte, write bool, delete bool) error {
675-
defer func(start time.Time) {
676-
snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds())
677-
}(time.Now())
678-
679-
if delete {
680-
rawdb.DeleteStorageSnapshot(batch, accountHash, common.BytesToHash(key))
681-
snapWipedStorageMeter.Mark(1)
682-
return nil
683-
}
684-
if write {
685-
rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(key), val)
686-
snapGeneratedStorageMeter.Mark(1)
687-
} else {
688-
snapRecoveredStorageMeter.Mark(1)
689-
}
690-
stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
691-
stats.slots++
692-
693-
// If we've exceeded our batch allowance or termination was requested, flush to disk
694-
if err := checkAndFlush(append(accountHash[:], key...)); err != nil {
695-
return err
696-
}
697-
return nil
698-
}
699-
var storeOrigin = common.CopyBytes(storeMarker)
700-
for {
701-
exhausted, last, err := dl.generateRange(acc.Root, append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...), "storage", storeOrigin, storageCheckRange, stats, onStorage, nil)
702-
if err != nil {
703-
return err
704-
}
705-
if exhausted {
706-
break
707-
}
708-
if storeOrigin = increaseKey(last); storeOrigin == nil {
709-
break // special case, the last is 0xffffffff...fff
710-
}
702+
if err := generateStorages(dl, accountHash, acc.Root, storeMarker, batch, stats, logged); err != nil {
703+
return err
711704
}
712705
}
713706
// Some account processed, unmark the marker
714707
accMarker = nil
715708
return nil
716709
}
717-
718-
// Global loop for regerating the entire state trie + all layered storage tries.
710+
// Always reset the initial account range as 1 whenever recover from the interruption.
711+
var accountRange = accountCheckRange
712+
if len(accMarker) > 0 {
713+
accountRange = 1
714+
}
715+
// Global loop for re-generating the account snapshots + all layered storage snapshots.
716+
origin := common.CopyBytes(accMarker)
719717
for {
720-
exhausted, last, err := dl.generateRange(dl.root, rawdb.SnapshotAccountPrefix, "account", accOrigin, accountRange, stats, onAccount, FullAccountRLP)
721-
// The procedure it aborted, either by external signal or internal error
718+
exhausted, last, err := dl.generateRange(dl.root, rawdb.SnapshotAccountPrefix, "account", origin, accountRange, stats, onAccount, FullAccountRLP)
722719
if err != nil {
723-
if abort == nil { // aborted by internal error, wait the signal
724-
abort = <-dl.genAbort
725-
}
726-
abort <- stats
727-
return
720+
return err // The procedure it aborted, either by external signal or internal error.
728721
}
729722
// Abort the procedure if the entire snapshot is generated
730723
if exhausted {
731724
break
732725
}
733-
if accOrigin = increaseKey(last); accOrigin == nil {
726+
if origin = increaseKey(last); origin == nil {
734727
break // special case, the last is 0xffffffff...fff
735728
}
736729
accountRange = accountCheckRange
737730
}
731+
return nil
732+
}
733+
734+
// generate is a background thread that iterates over the state and storage tries,
735+
// constructing the state snapshot. All the arguments are purely for statistics
736+
// gathering and logging, since the method surfs the blocks as they arrive, often
737+
// being restarted.
738+
func (dl *diskLayer) generate(stats *generatorStats) {
739+
var accMarker []byte
740+
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
741+
accMarker = dl.genMarker[:common.HashLength]
742+
}
743+
var (
744+
batch = dl.diskdb.NewBatch()
745+
logged = time.Now()
746+
abort chan *generatorStats
747+
)
748+
stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker)
749+
750+
// Generate the snapshot accounts from the point where they left off.
751+
if err := generateAccounts(dl, accMarker, batch, stats, &logged); err != nil {
752+
// Extract the received interruption signal if exists
753+
if aerr, ok := err.(*abortErr); ok {
754+
abort = aerr.abort
755+
}
756+
// Aborted by internal error, wait the signal
757+
if abort == nil {
758+
abort = <-dl.genAbort
759+
}
760+
abort <- stats
761+
return
762+
}
738763
// Snapshot fully generated, set the marker to nil.
739764
// Note even there is nothing to commit, persist the
740765
// generator anyway to mark the snapshot is complete.
@@ -762,7 +787,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
762787
}
763788

764789
// increaseKey increase the input key by one bit. Return nil if the entire
765-
// addition operation overflows,
790+
// addition operation overflows.
766791
func increaseKey(key []byte) []byte {
767792
for i := len(key) - 1; i >= 0; i-- {
768793
key[i]++
@@ -772,3 +797,17 @@ func increaseKey(key []byte) []byte {
772797
}
773798
return nil
774799
}
800+
801+
// abortErr wraps an interruption signal received to represent the
802+
// generation is aborted by external processes.
803+
type abortErr struct {
804+
abort chan *generatorStats
805+
}
806+
807+
func newAbortErr(abort chan *generatorStats) error {
808+
return &abortErr{abort: abort}
809+
}
810+
811+
func (err *abortErr) Error() string {
812+
return "aborted"
813+
}

0 commit comments

Comments
 (0)