Skip to content

Commit

Permalink
e3: fix dao integration test (erigontech#6171)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Dec 1, 2022
1 parent 5d9e74d commit fd3c44f
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 39 deletions.
2 changes: 2 additions & 0 deletions cmd/state/exec22/txtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type TxTask struct {
Logs []*types.Log
TraceFroms map[common.Address]struct{}
TraceTos map[common.Address]struct{}

UsedGas uint64
}

type TxTaskQueue []*TxTask
Expand Down
12 changes: 7 additions & 5 deletions cmd/state/exec3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,15 @@ func (rw *Worker) RunTxTask(txTask *exec22.TxTask) {
//}
var vmenv vm.VMInterface
if txTask.Tx.IsStarkNet() {
vmenv = &vm.CVMAdapter{Cvm: vm.NewCVM(ibs)}
rw.starkNetEvm.Reset(evmtypes.TxContext{}, ibs)
vmenv = rw.starkNetEvm
} else {
blockContext := core.NewEVMBlockContext(header, txTask.GetHashFn, rw.engine, nil /* author */)
txContext := core.NewEVMTxContext(msg)
vmenv = vm.NewEVM(blockContext, txContext, ibs, rw.chainConfig, vmConfig)
rw.evm.ResetBetweenBlocks(txTask.EvmBlockContext, core.NewEVMTxContext(msg), ibs, vmConfig, txTask.Rules)
vmenv = rw.evm
}
if _, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */); err != nil {
resss, err := core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */)
txTask.UsedGas = resss.UsedGas
if err != nil {
txTask.Error = err
//fmt.Printf("error=%v\n", err)
} else {
Expand Down
23 changes: 19 additions & 4 deletions core/state/rw22.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,12 @@ func (w *StateWriter22) SetTxNum(txNum uint64) {
}

func (w *StateWriter22) ResetWriteSet() {
w.writeLists = newWriteList()
w.writeLists = map[string]*exec22.KvList{
kv.PlainState: {},
kv.Code: {},
kv.PlainContractCode: {},
kv.IncarnationMap: {},
}
w.accountPrevs = map[string][]byte{}
w.accountDels = map[string]*accounts.Account{}
w.storagePrevs = map[string][]byte{}
Expand Down Expand Up @@ -670,8 +675,13 @@ type StateReader22 struct {

func NewStateReader22(rs *State22) *StateReader22 {
return &StateReader22{
rs: rs,
readLists: newReadList(),
rs: rs,
readLists: map[string]*exec22.KvList{
kv.PlainState: {},
kv.Code: {},
CodeSizeTable: {},
kv.IncarnationMap: {},
},
}
}

Expand All @@ -684,7 +694,12 @@ func (r *StateReader22) SetTx(tx kv.Tx) {
}

func (r *StateReader22) ResetReadSet() {
r.readLists = newReadList()
r.readLists = map[string]*exec22.KvList{
kv.PlainState: {},
kv.Code: {},
CodeSizeTable: {},
kv.IncarnationMap: {},
}
}

func (r *StateReader22) ReadSet() map[string]*exec22.KvList {
Expand Down
79 changes: 55 additions & 24 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"container/heap"
"context"
"encoding/binary"
"errors"
"fmt"
"math/big"
"os"
Expand Down Expand Up @@ -100,13 +101,17 @@ func (p *Progress) Log(rs *state.State22, rwsLen int, queueSize, count, inputBlo
p.prevRepeatCount = repeatCount
}

func Exec3(ctx context.Context,
execStage *StageState, workerCount int, batchSize datasize.ByteSize, chainDb kv.RwDB, applyTx kv.RwTx,
parallel bool, rs *state.State22, blockReader services.FullBlockReader,
logger log.Logger, agg *state2.Aggregator22, engine consensus.Engine,
maxBlockNum uint64, chainConfig *params.ChainConfig,
genesis *core.Genesis,
func ExecV3(ctx context.Context,
execStage *StageState, u Unwinder, workerCount int, cfg ExecuteBlockCfg, applyTx kv.RwTx,
parallel bool, rs *state.State22, logPrefix string,
logger log.Logger,
maxBlockNum uint64,
) (err error) {
batchSize, chainDb := cfg.batchSize, cfg.db
blockReader := cfg.blockReader
agg, engine := cfg.agg, cfg.engine
chainConfig, genesis := cfg.chainConfig, cfg.genesis

useExternalTx := applyTx != nil
if !useExternalTx && !parallel {
applyTx, err = chainDb.BeginRw(ctx)
Expand Down Expand Up @@ -433,9 +438,10 @@ func Exec3(ctx context.Context,
return h
}
}

var b *types.Block
var blockNum uint64
loop:
Loop:
for blockNum = block; blockNum <= maxBlockNum; blockNum++ {
t := time.Now()

Expand Down Expand Up @@ -478,7 +484,7 @@ loop:
}
}()
}

var gasUsed uint64
for txIndex := -1; txIndex <= len(txs); txIndex++ {
// Do not oversend, wait for the result heap to go under certain size
txTask := &exec22.TxTask{
Expand Down Expand Up @@ -517,6 +523,31 @@ loop:
if !parallel {
count++
execWorkers[0].RunTxTask(txTask)
if err := func() error {
if txTask.Final {
gasUsed += txTask.UsedGas
if gasUsed != txTask.Header.GasUsed {
return fmt.Errorf("gas used by execution: %d, in header: %d", gasUsed, txTask.Header.GasUsed)
}
gasUsed = 0
} else {
gasUsed += txTask.UsedGas
}
return nil
}(); err != nil {
if !errors.Is(err, context.Canceled) {
log.Warn(fmt.Sprintf("[%s] Execution failed", logPrefix), "block", blockNum, "hash", header.Hash().String(), "err", err)
if cfg.hd != nil {
cfg.hd.ReportBadHeaderPoS(header.Hash(), header.ParentHash)
}
if cfg.badBlockHalt {
return err
}
}
u.UnwindTo(blockNum-1, header.Hash())
break Loop
}

if err := rs.ApplyState(applyTx, txTask, agg); err != nil {
panic(fmt.Errorf("State22.Apply: %w", err))
}
Expand Down Expand Up @@ -579,7 +610,7 @@ loop:
case <-interruptCh:
log.Info(fmt.Sprintf("interrupted, please wait for cleanup, next run will start with block %d", blockNum))
maxTxNum.Store(inputTxNum)
break loop
break Loop
default:
}

Expand Down Expand Up @@ -627,28 +658,28 @@ func blockWithSenders(db kv.RoDB, tx kv.Tx, blockReader services.BlockReader, bl
return b, nil
}

func processResultQueue(rws *exec22.TxTaskQueue, outputTxNum *atomic2.Uint64, rs *state.State22, agg *state2.Aggregator22, applyTx kv.Tx,
triggerCount, outputBlockNum, repeatCount *atomic2.Uint64, resultsSize *atomic2.Int64, onSuccess func()) {
func processResultQueue(rws *exec22.TxTaskQueue, outputTxNum *atomic2.Uint64, rs *state.State22, agg *state2.Aggregator22, applyTx kv.Tx, triggerCount, outputBlockNum, repeatCount *atomic2.Uint64, resultsSize *atomic2.Int64, onSuccess func()) {
for rws.Len() > 0 && (*rws)[0].TxNum == outputTxNum.Load() {
txTask := heap.Pop(rws).(*exec22.TxTask)
resultsSize.Add(-txTask.ResultsSize)
if txTask.Error == nil && rs.ReadsValid(txTask.ReadLists) {
if err := rs.ApplyState(applyTx, txTask, agg); err != nil {
panic(fmt.Errorf("State22.Apply: %w", err))
}
triggerCount.Add(rs.CommitTxNum(txTask.Sender, txTask.TxNum))
outputTxNum.Inc()
outputBlockNum.Store(txTask.BlockNum)
onSuccess()
if err := rs.ApplyHistory(applyTx, txTask, agg); err != nil {
panic(fmt.Errorf("State22.Apply: %w", err))
}
//fmt.Printf("Applied %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
} else {
if txTask.Error != nil || !rs.ReadsValid(txTask.ReadLists) {
rs.AddWork(txTask)
repeatCount.Inc()
continue
//fmt.Printf("Rolled back %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
}

if err := rs.ApplyState(applyTx, txTask, agg); err != nil {
panic(fmt.Errorf("State22.Apply: %w", err))
}
triggerCount.Add(rs.CommitTxNum(txTask.Sender, txTask.TxNum))
outputTxNum.Inc()
outputBlockNum.Store(txTask.BlockNum)
onSuccess()
if err := rs.ApplyHistory(applyTx, txTask, agg); err != nil {
panic(fmt.Errorf("State22.Apply: %w", err))
}
//fmt.Printf("Applied %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
}
}

Expand Down
10 changes: 4 additions & 6 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func newStateReaderWriter(

// ================ Erigon3 ================

func ExecBlock22(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) {
func ExecBlockV3(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) {
workersCount := cfg.syncCfg.ExecWorkerCount
//workersCount := 2
if !initialCycle {
Expand Down Expand Up @@ -274,10 +274,8 @@ func ExecBlock22(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx cont
}
rs := state.NewState22()
parallel := initialCycle && tx == nil
if err := Exec3(ctx, s, workersCount, cfg.batchSize, cfg.db, tx, parallel, rs,
cfg.blockReader, log.New(), cfg.agg, cfg.engine,
to,
cfg.chainConfig, cfg.genesis); err != nil {
if err := ExecV3(ctx, s, u, workersCount, cfg, tx, parallel, rs, logPrefix,
log.New(), to); err != nil {
return err
}

Expand Down Expand Up @@ -351,7 +349,7 @@ func senderStageProgress(tx kv.Tx, db kv.RoDB) (prevStageProgress uint64, err er

func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool, quiet bool) (err error) {
if cfg.historyV3 {
if err = ExecBlock22(s, u, tx, toBlock, ctx, cfg, initialCycle); err != nil {
if err = ExecBlockV3(s, u, tx, toBlock, ctx, cfg, initialCycle); err != nil {
return err
}
return nil
Expand Down

0 comments on commit fd3c44f

Please sign in to comment.