Skip to content

Commit

Permalink
core/state/snapshot: clean up the generation code (ethereum#24479)
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 authored and cp-wjhan committed Nov 27, 2022
1 parent eef83ea commit 3c08083
Showing 1 changed file with 137 additions and 98 deletions.
235 changes: 137 additions & 98 deletions core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix
type onStateCallback func(key []byte, val []byte, write bool, delete bool) error

// generateRange generates the state segment with particular prefix. Generation can
// either verify the correctness of existing state through rangeproof and skip
// either verify the correctness of existing state through range-proof and skip
// generation, or iterate trie to regenerate state on demand.
func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, origin []byte, max int, stats *generatorStats, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) {
// Use range prover to check the validity of the flat state in the range
Expand Down Expand Up @@ -532,66 +532,94 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string,
return !trieMore && !result.diskMore, last, nil
}

// generate is a background thread that iterates over the state and storage tries,
// constructing the state snapshot. All the arguments are purely for statistics
// gathering and logging, since the method surfs the blocks as they arrive, often
// being restarted.
func (dl *diskLayer) generate(stats *generatorStats) {
var (
accMarker []byte
accountRange = accountCheckRange
)
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
// Always reset the initial account range as 1
// whenever recover from the interruption.
accMarker, accountRange = dl.genMarker[:common.HashLength], 1
// checkAndFlush checks if an interruption signal is received or the
// batch size has exceeded the allowance.
func (dl *diskLayer) checkAndFlush(current []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
var abort chan *generatorStats
select {
case abort = <-dl.genAbort:
default:
}
var (
batch = dl.diskdb.NewBatch()
logged = time.Now()
accOrigin = common.CopyBytes(accMarker)
abort chan *generatorStats
)
stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker)
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
if bytes.Compare(current, dl.genMarker) < 0 {
log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", dl.genMarker))
}
// Flush out the batch anyway no matter it's empty or not.
// It's possible that all the states are recovered and the
// generation indeed makes progress.
journalProgress(batch, current, stats)

checkAndFlush := func(currentLocation []byte) error {
select {
case abort = <-dl.genAbort:
default:
if err := batch.Write(); err != nil {
return err
}
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
if bytes.Compare(currentLocation, dl.genMarker) < 0 {
log.Error("Snapshot generator went backwards",
"currentLocation", fmt.Sprintf("%x", currentLocation),
"genMarker", fmt.Sprintf("%x", dl.genMarker))
}
batch.Reset()

// Flush out the batch anyway no matter it's empty or not.
// It's possible that all the states are recovered and the
// generation indeed makes progress.
journalProgress(batch, currentLocation, stats)
dl.lock.Lock()
dl.genMarker = current
dl.lock.Unlock()

if err := batch.Write(); err != nil {
return err
}
batch.Reset()
if abort != nil {
stats.Log("Aborting state snapshot generation", dl.root, current)
return newAbortErr(abort) // bubble up an error for interruption
}
}
if time.Since(*logged) > 8*time.Second {
stats.Log("Generating state snapshot", dl.root, current)
*logged = time.Now()
}
return nil
}

dl.lock.Lock()
dl.genMarker = currentLocation
dl.lock.Unlock()
// generateStorages generates the missing storage slots of the specific contract.
// It's supposed to restart the generation from the given origin position.
func generateStorages(dl *diskLayer, account common.Hash, storageRoot common.Hash, storeMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
onStorage := func(key []byte, val []byte, write bool, delete bool) error {
defer func(start time.Time) {
snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds())
}(time.Now())

if abort != nil {
stats.Log("Aborting state snapshot generation", dl.root, currentLocation)
return errors.New("aborted")
}
if delete {
rawdb.DeleteStorageSnapshot(batch, account, common.BytesToHash(key))
snapWipedStorageMeter.Mark(1)
return nil
}
if write {
rawdb.WriteStorageSnapshot(batch, account, common.BytesToHash(key), val)
snapGeneratedStorageMeter.Mark(1)
} else {
snapRecoveredStorageMeter.Mark(1)
}
if time.Since(logged) > 8*time.Second {
stats.Log("Generating state snapshot", dl.root, currentLocation)
logged = time.Now()
stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
stats.slots++

// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := dl.checkAndFlush(append(account[:], key...), batch, stats, logged); err != nil {
return err
}
return nil
}
// Loop for re-generating the missing storage slots.
var origin = common.CopyBytes(storeMarker)
for {
exhausted, last, err := dl.generateRange(storageRoot, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), "storage", origin, storageCheckRange, stats, onStorage, nil)
if err != nil {
return err // The procedure it aborted, either by external signal or internal error.
}
// Abort the procedure if the entire contract storage is generated
if exhausted {
break
}
if origin = increaseKey(last); origin == nil {
break // special case, the last is 0xffffffff...fff
}
}
return nil
}

// generateAccounts generates the missing snapshot accounts as well as their
// storage slots in the main trie. It's supposed to restart the generation
// from the given origin position.
func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
onAccount := func(key []byte, val []byte, write bool, delete bool) error {
var (
start = time.Now()
Expand Down Expand Up @@ -647,7 +675,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
marker = dl.genMarker[:]
}
// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := checkAndFlush(marker); err != nil {
if err := dl.checkAndFlush(marker, batch, stats, logged); err != nil {
return err
}
// If the iterated account is the contract, create a further loop to
Expand All @@ -671,70 +699,67 @@ func (dl *diskLayer) generate(stats *generatorStats) {
if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength {
storeMarker = dl.genMarker[common.HashLength:]
}
onStorage := func(key []byte, val []byte, write bool, delete bool) error {
defer func(start time.Time) {
snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds())
}(time.Now())

if delete {
rawdb.DeleteStorageSnapshot(batch, accountHash, common.BytesToHash(key))
snapWipedStorageMeter.Mark(1)
return nil
}
if write {
rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(key), val)
snapGeneratedStorageMeter.Mark(1)
} else {
snapRecoveredStorageMeter.Mark(1)
}
stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
stats.slots++

// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := checkAndFlush(append(accountHash[:], key...)); err != nil {
return err
}
return nil
}
var storeOrigin = common.CopyBytes(storeMarker)
for {
exhausted, last, err := dl.generateRange(acc.Root, append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...), "storage", storeOrigin, storageCheckRange, stats, onStorage, nil)
if err != nil {
return err
}
if exhausted {
break
}
if storeOrigin = increaseKey(last); storeOrigin == nil {
break // special case, the last is 0xffffffff...fff
}
if err := generateStorages(dl, accountHash, acc.Root, storeMarker, batch, stats, logged); err != nil {
return err
}
}
// Some account processed, unmark the marker
accMarker = nil
return nil
}

// Global loop for regerating the entire state trie + all layered storage tries.
// Always reset the initial account range as 1 whenever recover from the interruption.
var accountRange = accountCheckRange
if len(accMarker) > 0 {
accountRange = 1
}
// Global loop for re-generating the account snapshots + all layered storage snapshots.
origin := common.CopyBytes(accMarker)
for {
exhausted, last, err := dl.generateRange(dl.root, rawdb.SnapshotAccountPrefix, "account", accOrigin, accountRange, stats, onAccount, FullAccountRLP)
// The procedure it aborted, either by external signal or internal error
exhausted, last, err := dl.generateRange(dl.root, rawdb.SnapshotAccountPrefix, "account", origin, accountRange, stats, onAccount, FullAccountRLP)
if err != nil {
if abort == nil { // aborted by internal error, wait the signal
abort = <-dl.genAbort
}
abort <- stats
return
return err // The procedure it aborted, either by external signal or internal error.
}
// Abort the procedure if the entire snapshot is generated
if exhausted {
break
}
if accOrigin = increaseKey(last); accOrigin == nil {
if origin = increaseKey(last); origin == nil {
break // special case, the last is 0xffffffff...fff
}
accountRange = accountCheckRange
}
return nil
}

// generate is a background thread that iterates over the state and storage tries,
// constructing the state snapshot. All the arguments are purely for statistics
// gathering and logging, since the method surfs the blocks as they arrive, often
// being restarted.
func (dl *diskLayer) generate(stats *generatorStats) {
var accMarker []byte
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
accMarker = dl.genMarker[:common.HashLength]
}
var (
batch = dl.diskdb.NewBatch()
logged = time.Now()
abort chan *generatorStats
)
stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker)

// Generate the snapshot accounts from the point where they left off.
if err := generateAccounts(dl, accMarker, batch, stats, &logged); err != nil {
// Extract the received interruption signal if exists
if aerr, ok := err.(*abortErr); ok {
abort = aerr.abort
}
// Aborted by internal error, wait the signal
if abort == nil {
abort = <-dl.genAbort
}
abort <- stats
return
}
// Snapshot fully generated, set the marker to nil.
// Note even there is nothing to commit, persist the
// generator anyway to mark the snapshot is complete.
Expand Down Expand Up @@ -762,7 +787,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
}

// increaseKey increase the input key by one bit. Return nil if the entire
// addition operation overflows,
// addition operation overflows.
func increaseKey(key []byte) []byte {
for i := len(key) - 1; i >= 0; i-- {
key[i]++
Expand All @@ -772,3 +797,17 @@ func increaseKey(key []byte) []byte {
}
return nil
}

// abortErr wraps an interruption signal received to represent the
// generation is aborted by external processes.
type abortErr struct {
abort chan *generatorStats
}

func newAbortErr(abort chan *generatorStats) error {
return &abortErr{abort: abort}
}

func (err *abortErr) Error() string {
return "aborted"
}

0 comments on commit 3c08083

Please sign in to comment.