Skip to content

Commit

Permalink
Aggregator22.Unwind() (erigontech#5039)
Browse files Browse the repository at this point in the history
* save

* save
  • Loading branch information
AskAlexSharov authored Aug 13, 2022
1 parent 64bc837 commit 52fd0d0
Show file tree
Hide file tree
Showing 37 changed files with 158 additions and 62 deletions.
2 changes: 1 addition & 1 deletion cmd/downloader/downloader/downloadercfg/downloadercfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func New(snapDir string, verbosity lg.Level, dbg bool, natif nat.Interface, down
// rates are divided by 2 - I don't know why it works, maybe bug inside torrent lib accounting
torrentConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(uploadRate.Bytes()), 2*DefaultNetworkChunkSize) // default: unlimited
if downloadRate.Bytes() < 500_000_000 {
b := int(2 * DefaultNetworkChunkSize)
b := 2 * DefaultNetworkChunkSize
if downloadRate.Bytes() > DefaultNetworkChunkSize {
b = int(2 * downloadRate.Bytes())
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/bor_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (api *BorImpl) GetCurrentValidators() ([]*bor.Validator, error) {

// GetRootHash returns the merkle root of the start to end block headers
func (api *BorImpl) GetRootHash(start, end uint64) (string, error) {
length := uint64(end - start + 1)
length := end - start + 1
if length > bor.MaxCheckpointLength {
return "", &bor.MaxCheckpointLengthExceededError{Start: start, End: end}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/debug_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestTraceBlockByNumber(t *testing.T) {
}
var buf bytes.Buffer
stream := jsoniter.NewStream(jsoniter.ConfigDefault, &buf, 4096)
err := api.TraceBlockByNumber(context.Background(), rpc.BlockNumber(rpc.LatestBlockNumber), &tracers.TraceConfig{}, stream)
err := api.TraceBlockByNumber(context.Background(), rpc.LatestBlockNumber, &tracers.TraceConfig{}, stream)
if err != nil {
t.Errorf("traceBlock %v: %v", rpc.LatestBlockNumber, err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/rpcdaemon/commands/engine_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,14 @@ func (e *EngineImpl) NewPayloadV1(ctx context.Context, payload *ExecutionPayload
// Convert slice of hexutil.Bytes to a slice of slice of bytes
transactions := make([][]byte, len(payload.Transactions))
for i, transaction := range payload.Transactions {
transactions[i] = ([]byte)(transaction)
transactions[i] = transaction
}
res, err := e.api.EngineNewPayloadV1(ctx, &types2.ExecutionPayload{
ParentHash: gointerfaces.ConvertHashToH256(payload.ParentHash),
Coinbase: gointerfaces.ConvertAddressToH160(payload.FeeRecipient),
StateRoot: gointerfaces.ConvertHashToH256(payload.StateRoot),
ReceiptRoot: gointerfaces.ConvertHashToH256(payload.ReceiptsRoot),
LogsBloom: gointerfaces.ConvertBytesToH2048(([]byte)(payload.LogsBloom)),
LogsBloom: gointerfaces.ConvertBytesToH2048(payload.LogsBloom),
PrevRandao: gointerfaces.ConvertHashToH256(payload.PrevRandao),
BlockNumber: uint64(payload.BlockNumber),
GasLimit: uint64(payload.GasLimit),
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/trace_filtering.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str

it := allBlocks.Iterator()
for it.HasNext() {
b := uint64(it.Next())
b := it.Next()
// Extract transactions from block
hash, hashErr := rawdb.ReadCanonicalHash(dbtx, b)
if hashErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon22/commands/bor_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (api *BorImpl) GetCurrentValidators() ([]*bor.Validator, error) {

// GetRootHash returns the merkle root of the start to end block headers
func (api *BorImpl) GetRootHash(start, end uint64) (string, error) {
length := uint64(end - start + 1)
length := end - start + 1
if length > bor.MaxCheckpointLength {
return "", &bor.MaxCheckpointLengthExceededError{Start: start, End: end}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon22/commands/debug_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestTraceBlockByNumber(t *testing.T) {
}
var buf bytes.Buffer
stream := jsoniter.NewStream(jsoniter.ConfigDefault, &buf, 4096)
err := api.TraceBlockByNumber(context.Background(), rpc.BlockNumber(rpc.LatestBlockNumber), &tracers.TraceConfig{}, stream)
err := api.TraceBlockByNumber(context.Background(), rpc.LatestBlockNumber, &tracers.TraceConfig{}, stream)
if err != nil {
t.Errorf("traceBlock %v: %v", rpc.LatestBlockNumber, err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/rpcdaemon22/commands/engine_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,14 @@ func (e *EngineImpl) NewPayloadV1(ctx context.Context, payload *ExecutionPayload
// Convert slice of hexutil.Bytes to a slice of slice of bytes
transactions := make([][]byte, len(payload.Transactions))
for i, transaction := range payload.Transactions {
transactions[i] = ([]byte)(transaction)
transactions[i] = transaction
}
res, err := e.api.EngineNewPayloadV1(ctx, &types2.ExecutionPayload{
ParentHash: gointerfaces.ConvertHashToH256(payload.ParentHash),
Coinbase: gointerfaces.ConvertAddressToH160(payload.FeeRecipient),
StateRoot: gointerfaces.ConvertHashToH256(payload.StateRoot),
ReceiptRoot: gointerfaces.ConvertHashToH256(payload.ReceiptsRoot),
LogsBloom: gointerfaces.ConvertBytesToH2048(([]byte)(payload.LogsBloom)),
LogsBloom: gointerfaces.ConvertBytesToH2048(payload.LogsBloom),
PrevRandao: gointerfaces.ConvertHashToH256(payload.PrevRandao),
BlockNumber: uint64(payload.BlockNumber),
GasLimit: uint64(payload.GasLimit),
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon22/commands/trace_filtering.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
stateReader := state.NewHistoryReader22(ac, nil /* ReadIndices */)
noop := state.NewNoopWriter()
for it.HasNext() {
txNum := uint64(it.Next())
txNum := it.Next()
// Find block number
blockNum := uint64(sort.Search(len(api._txNums), func(i int) bool {
return api._txNums[i] > txNum
Expand Down
2 changes: 1 addition & 1 deletion cmd/state/commands/check_change_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func CheckChangeSets(genesis *core.Genesis, logger log.Logger, blockNum uint64,
return h
}
contractHasTEVM := ethdb.GetHasTEVM(rwtx)
receipts, err1 := runBlock(engine, intraBlockState, noOpWriter, blockWriter, chainConfig, getHeader, contractHasTEVM, b, vmConfig, blockNum == uint64(block))
receipts, err1 := runBlock(engine, intraBlockState, noOpWriter, blockWriter, chainConfig, getHeader, contractHasTEVM, b, vmConfig, blockNum == block)
if err1 != nil {
return err1
}
Expand Down
6 changes: 3 additions & 3 deletions common/dbutils/suffix_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import "encoding/binary"
type Suffix []byte

func ToSuffix(b []byte) Suffix {
return Suffix(b)
return b
}

func (s Suffix) Add(key []byte) Suffix {
Expand All @@ -20,7 +20,7 @@ func (s Suffix) Add(key []byte) Suffix {
binary.BigEndian.PutUint32(dv, 1+s.KeyCount()) // Increment the counter of keys
dv[l] = byte(len(key))
copy(dv[l+1:], key)
return Suffix(dv)
return dv
}
func (s Suffix) MultiAdd(keys [][]byte) Suffix {
var l int
Expand All @@ -43,7 +43,7 @@ func (s Suffix) MultiAdd(keys [][]byte) Suffix {
copy(dv[i:], key)
i += len(key)
}
return Suffix(dv)
return dv
}

func (s Suffix) KeyCount() uint32 {
Expand Down
4 changes: 2 additions & 2 deletions common/math/integer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,6 @@ func TestMustParseUint64Panic(t *testing.T) {
func TestAbsoluteDifference(t *testing.T) {
x1 := uint64(99)
x2 := uint64(45)
assert.Equal(t, AbsoluteDifference(x1, x2), uint64(x1-x2))
assert.Equal(t, AbsoluteDifference(x2, x1), uint64(x1-x2))
assert.Equal(t, AbsoluteDifference(x1, x2), x1-x2)
assert.Equal(t, AbsoluteDifference(x2, x1), x1-x2)
}
4 changes: 2 additions & 2 deletions consensus/bor/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (api *API) GetRootHash(start uint64, end uint64) (string, error) {
if root, known := api.rootHashCache.Get(key); known {
return root.(string), nil
}
length := uint64(end - start + 1)
length := end - start + 1
if length > MaxCheckpointLength {
return "", &MaxCheckpointLengthExceededError{start, end}
}
Expand All @@ -147,7 +147,7 @@ func (api *API) GetRootHash(start uint64, end uint64) (string, error) {
wg.Add(1)
concurrent <- true
go func(number uint64) {
blockHeaders[number-start] = api.chain.GetHeaderByNumber(uint64(number))
blockHeaders[number-start] = api.chain.GetHeaderByNumber(number)
<-concurrent
wg.Done()
}(i)
Expand Down
2 changes: 1 addition & 1 deletion consensus/bor/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewHeimdallClient(urlString string) (*HeimdallClient, error) {
h := &HeimdallClient{
urlString: urlString,
client: http.Client{
Timeout: time.Duration(5 * time.Second),
Timeout: 5 * time.Second,
},
}
return h, nil
Expand Down
5 changes: 5 additions & 0 deletions core/state/rw22.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/google/btree"
"github.com/holiman/uint256"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/kv"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/common"
Expand Down Expand Up @@ -109,6 +110,10 @@ func NewState22() *State22 {
}

func (rs *State22) put(table string, key, val []byte) {
if table == kv.PlainState {
fmt.Printf("table: %s, %s\n", table, dbg.Stack())
}

t, ok := rs.changes[table]
if !ok {
t = btree.NewG[StateItem](32, stateItemLess)
Expand Down
2 changes: 1 addition & 1 deletion core/types/legacy_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (tx LegacyTx) EncodeRLP(w io.Writer) error {
// DecodeRLP decodes LegacyTx but with the list token already consumed and encodingSize being presented
func (tx *LegacyTx) DecodeRLP(s *rlp.Stream, encodingSize uint64) error {
var err error
s.NewList(uint64(encodingSize))
s.NewList(encodingSize)
if tx.Nonce, err = s.Uint(); err != nil {
return fmt.Errorf("read Nonce: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/vm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func (evm *EVM) Create(caller ContractRef, code []byte, gas uint64, value *uint2
// DESCRIBED: docs/programmers_guide/guide.md#nonce
func (evm *EVM) Create2(caller ContractRef, code []byte, gas uint64, endowment *uint256.Int, salt *uint256.Int) (ret []byte, contractAddr common.Address, leftOverGas uint64, err error) {
codeAndHash := &codeAndHash{code: code}
contractAddr = crypto.CreateAddress2(caller.Address(), common.Hash(salt.Bytes32()), codeAndHash.Hash().Bytes())
contractAddr = crypto.CreateAddress2(caller.Address(), salt.Bytes32(), codeAndHash.Hash().Bytes())
return evm.create(caller, codeAndHash, gas, endowment, contractAddr, CREATE2T)
}

Expand Down
4 changes: 2 additions & 2 deletions core/vm/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func opReturnDataCopy(pc *uint64, interpreter *EVMInterpreter, scope *ScopeConte

func opExtCodeSize(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]byte, error) {
slot := scope.Stack.Peek()
slot.SetUint64(uint64(interpreter.evm.IntraBlockState().GetCodeSize(common.Address(slot.Bytes20()))))
slot.SetUint64(uint64(interpreter.evm.IntraBlockState().GetCodeSize(slot.Bytes20())))
return nil, nil
}

Expand Down Expand Up @@ -869,7 +869,7 @@ func makeLog(size int) executionFunc {
mStart, mSize := stack.Pop(), stack.Pop()
for i := 0; i < size; i++ {
addr := stack.Pop()
topics[i] = common.Hash(addr.Bytes32())
topics[i] = addr.Bytes32()
}

d := scope.Memory.GetCopy(mStart.Uint64(), mSize.Uint64())
Expand Down
2 changes: 1 addition & 1 deletion core/vm/lightclient/iavl/key_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewKeyFormat(prefix byte, layout ...int) *KeyFormat {
// For prefix byte
length := 1
for _, l := range layout {
length += int(l)
length += l
}
return &KeyFormat{
prefix: prefix,
Expand Down
2 changes: 1 addition & 1 deletion core/vm/lightclient/iavl/proof_iavl_absence.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (op IAVLAbsenceOp) Run(args [][]byte) ([][]byte, error) {
// XXX What is the encoding for keys?
// We should decode the key depending on whether it's a string or hex,
// maybe based on quotes and 0x prefix?
err = op.Proof.VerifyAbsence([]byte(op.key))
err = op.Proof.VerifyAbsence(op.key)
if err != nil {
return nil, cmn.ErrorWrap(err, "verifying absence")
}
Expand Down
2 changes: 1 addition & 1 deletion core/vm/lightclient/iavl/proof_iavl_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (op IAVLValueOp) Run(args [][]byte) ([][]byte, error) {
// XXX What is the encoding for keys?
// We should decode the key depending on whether it's a string or hex,
// maybe based on quotes and 0x prefix?
err = op.Proof.VerifyItem([]byte(op.key), value)
err = op.Proof.VerifyItem(op.key, value)
if err != nil {
return nil, cmn.ErrorWrap(err, "verifying value")
}
Expand Down
2 changes: 1 addition & 1 deletion core/vm/lightclient/iavl/proof_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (pl PathToLeaf) dropRoot() PathToLeaf {
if pl.isEmpty() {
return pl
}
return PathToLeaf(pl[:len(pl)-1])
return pl[:len(pl)-1]
}

// TODO: (leonard) unused linter complains these are unused methods
Expand Down
2 changes: 1 addition & 1 deletion core/vm/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (l *StructLogger) CaptureState(env *EVM, pc uint64, op OpCode, gas, cost ui
value uint256.Int
)
env.IntraBlockState().GetState(contract.Address(), &address, &value)
l.storage[contract.Address()][address] = common.Hash(value.Bytes32())
l.storage[contract.Address()][address] = value.Bytes32()
}
// capture SSTORE opcodes and record the written entry in the local storage.
if op == SSTORE && stack.Len() >= 2 {
Expand Down
2 changes: 1 addition & 1 deletion eth/gasprice/feehistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (oracle *Oracle) processBlock(bf *blockFees, percentiles []float64) {
if bf.baseFee = bf.header.BaseFee; bf.baseFee == nil {
bf.baseFee = new(big.Int)
}
if chainconfig.IsLondon(uint64(bf.blockNumber + 1)) {
if chainconfig.IsLondon(bf.blockNumber + 1) {
bf.nextBaseFee = misc.CalcBaseFee(chainconfig, bf.header)
} else {
bf.nextBaseFee = new(big.Int)
Expand Down
93 changes: 92 additions & 1 deletion eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ func newStateReaderWriter(
return stateReader, stateWriter, nil
}

// ================ Erigon22 ================

func ExecBlock22(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
Expand Down Expand Up @@ -262,7 +264,6 @@ func ExecBlock22(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx cont
var prevStageProgress uint64
// Compute mapping blockNum -> last TxNum in that block
var txNums []uint64

if tx != nil {
prevStageProgress, err = stages.GetStageProgress(tx, stages.Senders)
if err != nil {
Expand Down Expand Up @@ -335,6 +336,92 @@ func ExecBlock22(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx cont
return nil
}

func UnwindExec22(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) {
if u.UnwindPoint >= s.BlockNumber {
return nil
}
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
}
logPrefix := u.LogPrefix()
log.Info(fmt.Sprintf("[%s] Unwind Execution", logPrefix), "from", s.BlockNumber, "to", u.UnwindPoint)

//rs := state.NewState22()
aggDir := path.Join(cfg.dirs.DataDir, "agg22")
dir.MustExist(aggDir)
agg, err := libstate.NewAggregator22(aggDir, AggregationStep)
if err != nil {
return err
}
defer agg.Close()

allSnapshots := cfg.blockReader.(WithSnapshots).Snapshots()

var prevStageProgress uint64
// Compute mapping blockNum -> last TxNum in that block
var txNums []uint64
if tx != nil {
prevStageProgress, err = stages.GetStageProgress(tx, stages.Senders)
if err != nil {
return err
}
txNums = make([]uint64, prevStageProgress+1)
if err := (snapshotsync.BodiesIterator{}).ForEach(tx, allSnapshots, 0, func(blockNum, baseTxNum, txAmount uint64) error {
if blockNum > prevStageProgress {
return nil
}
txNums[blockNum] = baseTxNum + txAmount
return nil
}); err != nil {
return fmt.Errorf("build txNum => blockNum mapping: %w", err)
}
} else {
if err = cfg.db.View(ctx, func(tx kv.Tx) error {
prevStageProgress, err = stages.GetStageProgress(tx, stages.Senders)
if err != nil {
return err
}
txNums = make([]uint64, prevStageProgress)
if err := (snapshotsync.BodiesIterator{}).ForEach(tx, allSnapshots, 0, func(blockNum, baseTxNum, txAmount uint64) error {
if blockNum > prevStageProgress {
return nil
}
txNums[blockNum] = baseTxNum + txAmount
return nil
}); err != nil {
return fmt.Errorf("build txNum => blockNum mapping: %w", err)
}
return nil
}); err != nil {
return err
}
}

agg.SetTx(tx)
agg.SetTxNum(txNums[prevStageProgress])
if err := agg.Unwind(txNums[u.UnwindPoint]); err != nil {
return err
}
if err = u.Done(tx); err != nil {
return err
}

if !useExternalTx {
if err = tx.Commit(); err != nil {
return err
}
}

return nil
}

// ================ Erigon22 End ================

func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) {
if cfg.exec22 {
return ExecBlock22(s, u, tx, toBlock, ctx, cfg, initialCycle)
Expand Down Expand Up @@ -547,6 +634,10 @@ func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, current
}

func UnwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) {
if cfg.exec22 {
return UnwindExec22(u, s, tx, ctx, cfg, initialCycle)
}

quit := ctx.Done()
if u.UnwindPoint >= s.BlockNumber {
return nil
Expand Down
Loading

0 comments on commit 52fd0d0

Please sign in to comment.