Skip to content

Commit

Permalink
Simplify StateReaderV3 by extracting StateReaderParallelV3 (erigo…
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Aug 13, 2024
1 parent f47b02b commit 959bc37
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 16 deletions.
12 changes: 10 additions & 2 deletions cmd/state/exec3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro

func (rw *Worker) ResetState(rs *state.StateV3, accumulator *shards.Accumulator) {
rw.rs = rs
rw.SetReader(state.NewStateReaderV3(rs.Domains()))
if rw.background {
rw.SetReader(state.NewStateReaderParallelV3(rs.Domains()))
} else {
rw.SetReader(state.NewStateReaderV3(rs.Domains()))
}
rw.stateWriter = state.NewStateWriterV3(rs, accumulator)
}

Expand Down Expand Up @@ -161,7 +165,11 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask) {
// Needed to correctly evaluate spent gas and other things.
rw.SetReader(state.NewHistoryReaderV3())
} else if !txTask.HistoryExecution && rw.historyMode {
rw.SetReader(state.NewStateReaderV3(rw.rs.Domains()))
if rw.background {
rw.SetReader(state.NewStateReaderParallelV3(rw.rs.Domains()))
} else {
rw.SetReader(state.NewStateReaderV3(rw.rs.Domains()))
}
}
if rw.background && rw.chainTx == nil {
var err error
Expand Down
113 changes: 100 additions & 13 deletions core/state/rw_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,28 +582,115 @@ type StateReaderV3 struct {
trace bool
sd *libstate.SharedDomains
composite []byte

discardReadList bool
readLists map[string]*libstate.KvList
}

func NewStateReaderV3(sd *libstate.SharedDomains) *StateReaderV3 {
return &StateReaderV3{
//trace: true,
sd: sd,
readLists: newReadList(),
composite: make([]byte, 20+32),
}
}

func (r *StateReaderV3) DiscardReadList() { r.discardReadList = true }
func (r *StateReaderV3) DiscardReadList() {}
func (r *StateReaderV3) SetTxNum(txNum uint64) { r.txNum = txNum }
func (r *StateReaderV3) SetTx(tx kv.Tx) {}
func (r *StateReaderV3) ReadSet() map[string]*libstate.KvList { return r.readLists }
func (r *StateReaderV3) ReadSet() map[string]*libstate.KvList { return nil }
func (r *StateReaderV3) SetTrace(trace bool) { r.trace = trace }
func (r *StateReaderV3) ResetReadSet() { r.readLists = newReadList() }
func (r *StateReaderV3) ResetReadSet() {}

func (r *StateReaderV3) ReadAccountData(address common.Address) (*accounts.Account, error) {
enc, _, err := r.sd.DomainGet(kv.AccountsDomain, address[:], nil)
if err != nil {
return nil, err
}
if len(enc) == 0 {
if r.trace {
fmt.Printf("ReadAccountData [%x] => [empty], txNum: %d\n", address, r.txNum)
}
return nil, nil
}

var acc accounts.Account
if err := accounts.DeserialiseV3(&acc, enc); err != nil {
return nil, err
}
if r.trace {
fmt.Printf("ReadAccountData [%x] => [nonce: %d, balance: %d, codeHash: %x], txNum: %d\n", address, acc.Nonce, &acc.Balance, acc.CodeHash, r.txNum)
}
return &acc, nil
}

func (r *StateReaderV3) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
r.composite = append(append(r.composite[:0], address[:]...), key.Bytes()...)
enc, _, err := r.sd.DomainGet(kv.StorageDomain, r.composite, nil)
if err != nil {
return nil, err
}
if r.trace {
if enc == nil {
fmt.Printf("ReadAccountStorage [%x] => [empty], txNum: %d\n", r.composite, r.txNum)
} else {
fmt.Printf("ReadAccountStorage [%x] => [%x], txNum: %d\n", r.composite, enc, r.txNum)
}
}
return enc, nil
}

func (r *StateReaderV3) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
enc, _, err := r.sd.DomainGet(kv.CodeDomain, address[:], nil)
if err != nil {
return nil, err
}
if r.trace {
fmt.Printf("ReadAccountCode [%x] => [%x], txNum: %d\n", address, enc, r.txNum)
}
return enc, nil
}

func (r *StateReaderV3) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
enc, _, err := r.sd.DomainGet(kv.CodeDomain, address[:], nil)
if err != nil {
return 0, err
}
size := len(enc)
if r.trace {
fmt.Printf("ReadAccountCodeSize [%x] => [%d], txNum: %d\n", address, size, r.txNum)
}
return size, nil
}

func (r *StateReaderV3) ReadAccountIncarnation(address common.Address) (uint64, error) {
return 0, nil
}

type StateReaderParallelV3 struct {
txNum uint64
trace bool
sd *libstate.SharedDomains
composite []byte

discardReadList bool
readLists map[string]*libstate.KvList
}

func NewStateReaderParallelV3(sd *libstate.SharedDomains) *StateReaderParallelV3 {
return &StateReaderParallelV3{
//trace: true,
sd: sd,
readLists: newReadList(),
composite: make([]byte, 20+32),
}
}

func (r *StateReaderParallelV3) DiscardReadList() { r.discardReadList = true }
func (r *StateReaderParallelV3) SetTxNum(txNum uint64) { r.txNum = txNum }
func (r *StateReaderParallelV3) SetTx(tx kv.Tx) {}
func (r *StateReaderParallelV3) ReadSet() map[string]*libstate.KvList { return r.readLists }
func (r *StateReaderParallelV3) SetTrace(trace bool) { r.trace = trace }
func (r *StateReaderParallelV3) ResetReadSet() { r.readLists = newReadList() }

func (r *StateReaderParallelV3) ReadAccountData(address common.Address) (*accounts.Account, error) {
enc, _, err := r.sd.DomainGet(kv.AccountsDomain, address[:], nil)
if err != nil {
return nil, err
Expand All @@ -629,7 +716,7 @@ func (r *StateReaderV3) ReadAccountData(address common.Address) (*accounts.Accou
return &acc, nil
}

func (r *StateReaderV3) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
func (r *StateReaderParallelV3) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
r.composite = append(append(r.composite[:0], address[:]...), key.Bytes()...)
enc, _, err := r.sd.DomainGet(kv.StorageDomain, r.composite, nil)
if err != nil {
Expand All @@ -648,7 +735,7 @@ func (r *StateReaderV3) ReadAccountStorage(address common.Address, incarnation u
return enc, nil
}

func (r *StateReaderV3) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
func (r *StateReaderParallelV3) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
enc, _, err := r.sd.DomainGet(kv.CodeDomain, address[:], nil)
if err != nil {
return nil, err
Expand All @@ -663,14 +750,14 @@ func (r *StateReaderV3) ReadAccountCode(address common.Address, incarnation uint
return enc, nil
}

func (r *StateReaderV3) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
func (r *StateReaderParallelV3) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
enc, _, err := r.sd.DomainGet(kv.CodeDomain, address[:], nil)
if err != nil {
return 0, err
}
var sizebuf [8]byte
binary.BigEndian.PutUint64(sizebuf[:], uint64(len(enc)))
if !r.discardReadList {
var sizebuf [8]byte
binary.BigEndian.PutUint64(sizebuf[:], uint64(len(enc)))
r.readLists[libstate.CodeSizeTableFake].Push(string(address[:]), sizebuf[:])
}
size := len(enc)
Expand All @@ -680,7 +767,7 @@ func (r *StateReaderV3) ReadAccountCodeSize(address common.Address, incarnation
return size, nil
}

func (r *StateReaderV3) ReadAccountIncarnation(address common.Address) (uint64, error) {
func (r *StateReaderParallelV3) ReadAccountIncarnation(address common.Address) (uint64, error) {
return 0, nil
}

Expand Down
1 change: 0 additions & 1 deletion eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ func ExecV3(ctx context.Context,
applyWorker = cfg.applyWorkerMining
}
applyWorker.ResetState(rs, accumulator)
applyWorker.DiscardReadList()

commitThreshold := batchSize.Bytes()
progress := NewProgress(blockNum, commitThreshold, workerCount, false, execStage.LogPrefix(), logger)
Expand Down

0 comments on commit 959bc37

Please sign in to comment.