From eab795d2e7e05a2d8360d87b9d1132da5b56225b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mi=C5=82kowski?= Date: Thu, 7 Mar 2024 19:53:07 +0100 Subject: [PATCH] Mempool blob feed + trace related fixes (#164) * Mempool blobs minimalistic change * Remove the big fields * Remove NoBaseFee * Change base fee to blockCtx --- core/txpool/blobpool/blobpool.go | 13 ++++++- eth/filters/dropped_tx_subscription.go | 50 ++++++++++++++------------ eth/filters/peers_api.go | 22 +++++++++++- eth/filters/trace_api.go | 28 +++++++-------- 4 files changed, 73 insertions(+), 40 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 4c457f35231a..abcd00b8304b 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1217,17 +1217,28 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction { func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error { var ( adds = make([]*types.Transaction, 0, len(txs)) + // -------- BLOCKNATIVE MODIFICATION START ------------- + fullAdds = make([]*types.Transaction, 0, len(txs)) + // -------- BLOCKNATIVE MODIFICATION STOP ------------- errs = make([]error, len(txs)) ) for i, tx := range txs { errs[i] = p.add(tx) if errs[i] == nil { adds = append(adds, tx.WithoutBlobTxSidecar()) + // -------- BLOCKNATIVE MODIFICATION START ------------- + fullAdds = append(adds, tx) + // -------- BLOCKNATIVE MODIFICATION STOP ------------- } } + // -------- BLOCKNATIVE MODIFICATION START ------------- + if len(fullAdds) > 0 { + p.insertFeed.Send(core.NewTxsEvent{Txs: fullAdds}) + } + // -------- BLOCKNATIVE MODIFICATION STOP ------------- + if len(adds) > 0 { p.discoverFeed.Send(core.NewTxsEvent{Txs: adds}) - p.insertFeed.Send(core.NewTxsEvent{Txs: adds}) } return errs } diff --git a/eth/filters/dropped_tx_subscription.go b/eth/filters/dropped_tx_subscription.go index 6e2ccec58e61..8ab82edc3526 100644 --- a/eth/filters/dropped_tx_subscription.go +++ b/eth/filters/dropped_tx_subscription.go @@ -34,29 +34,31 @@ type rejectNotification struct { // RPCTransaction represents a transaction that will serialize to the RPC representation of a transaction type RPCTransaction struct { - BlockHash *common.Hash `json:"blockHash"` - BlockNumber *hexutil.Big `json:"blockNumber"` - From common.Address `json:"from"` - Gas hexutil.Uint64 `json:"gas"` - GasPrice *hexutil.Big `json:"gasPrice"` - GasFeeCap *hexutil.Big `json:"maxFeePerGas,omitempty"` - GasTipCap *hexutil.Big `json:"maxPriorityFeePerGas,omitempty"` - MaxFeePerBlobGas *hexutil.Big `json:"maxFeePerBlobGas,omitempty"` - Hash common.Hash `json:"hash"` - Input hexutil.Bytes `json:"input"` - Nonce hexutil.Uint64 `json:"nonce"` - To *common.Address `json:"to"` - TransactionIndex *hexutil.Uint64 `json:"transactionIndex"` - Value *hexutil.Big `json:"value"` - Type hexutil.Uint64 `json:"type"` - Accesses *types.AccessList `json:"accessList,omitempty"` - ChainID *hexutil.Big `json:"chainId,omitempty"` - BlobVersionedHashes []common.Hash `json:"blobVersionedHashes,omitempty"` - V *hexutil.Big `json:"v"` - R *hexutil.Big `json:"r"` - S *hexutil.Big `json:"s"` - YParity *hexutil.Uint64 `json:"yParity,omitempty"` - Trace *blocknative.Trace `json:"trace,omitempty"` + BlockHash *common.Hash `json:"blockHash"` + BlockNumber *hexutil.Big `json:"blockNumber"` + From common.Address `json:"from"` + Gas hexutil.Uint64 `json:"gas"` + GasPrice *hexutil.Big `json:"gasPrice"` + GasFeeCap *hexutil.Big `json:"maxFeePerGas,omitempty"` + GasTipCap *hexutil.Big `json:"maxPriorityFeePerGas,omitempty"` + MaxFeePerBlobGas *hexutil.Big `json:"maxFeePerBlobGas,omitempty"` + Hash common.Hash `json:"hash"` + Input hexutil.Bytes `json:"input"` + Nonce hexutil.Uint64 `json:"nonce"` + To *common.Address `json:"to"` + TransactionIndex *hexutil.Uint64 `json:"transactionIndex"` + Value *hexutil.Big `json:"value"` + Type hexutil.Uint64 `json:"type"` + Accesses *types.AccessList `json:"accessList,omitempty"` + ChainID *hexutil.Big `json:"chainId,omitempty"` + BlobVersionedHashes []common.Hash `json:"blobVersionedHashes,omitempty"` + V *hexutil.Big `json:"v"` + R *hexutil.Big `json:"r"` + S *hexutil.Big `json:"s"` + YParity *hexutil.Uint64 `json:"yParity,omitempty"` + + BlobSidecar *types.BlobTxSidecar `json:"sidecar,omitempty"` + Trace *blocknative.Trace `json:"trace,omitempty"` } // newRPCTransaction returns a transaction that will serialize to the RPC @@ -119,6 +121,8 @@ func newRPCPendingTransaction(tx *types.Transaction) *RPCTransaction { result.GasPrice = (*hexutil.Big)(tx.GasFeeCap()) result.MaxFeePerBlobGas = (*hexutil.Big)(tx.BlobGasFeeCap()) result.BlobVersionedHashes = tx.BlobHashes() + + result.BlobSidecar = tx.BlobTxSidecar() } return result } diff --git a/eth/filters/peers_api.go b/eth/filters/peers_api.go index 92dba914f187..c8581a355e58 100644 --- a/eth/filters/peers_api.go +++ b/eth/filters/peers_api.go @@ -72,6 +72,14 @@ func RPCMarshalHeader(head *types.Header) map[string]interface{} { result["baseFeePerGas"] = (*hexutil.Big)(head.BaseFee) } + if head.BlobGasUsed != nil { + result["blobGasUsed"] = hexutil.Uint64(*head.BlobGasUsed) + } + + if head.ExcessBlobGas != nil { + result["excessBlobGas"] = hexutil.Uint64(*head.ExcessBlobGas) + } + return result } @@ -480,7 +488,19 @@ func (api *FilterAPI) NewPendingTransactionsWithPeers(ctx context.Context) (*rpc peerid, _ := txPeerMap.Get(h) p2pts, _ := tsMap.Get(h) peer, _ := peerIDMap.Load(peerid) - notifier.Notify(rpcSub.ID, withPeer{Value: newRPCPendingTransaction(api.sys.backend.GetPoolTransaction(h)), Peer: peer, Time: time.Now().UnixNano(), P2PTime: p2pts}) + + bTx := api.sys.backend.GetPoolTransaction(h) + val := newRPCPendingTransaction(bTx) + if val == nil { + val = newRPCPendingTransaction(tx) + } + + if tx != nil && val != nil { + if tx.Type() == 3 && val.BlobSidecar == nil { + val.BlobSidecar = tx.BlobTxSidecar() + } + } + notifier.Notify(rpcSub.ID, withPeer{Value: val, Peer: peer, Time: time.Now().UnixNano(), P2PTime: p2pts}) } case <-rpcSub.Err(): pendingTxSub.Unsubscribe() diff --git a/eth/filters/trace_api.go b/eth/filters/trace_api.go index 9ac6793c417e..047495440050 100644 --- a/eth/filters/trace_api.go +++ b/eth/filters/trace_api.go @@ -6,8 +6,6 @@ import ( "errors" "fmt" "math/big" - "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -108,7 +106,7 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace traceCtx.TxHash = tx.Hash() trace, err := traceTx(msg, traceCtx, blockCtx, chainConfig, statedb, tracerOpts) if err != nil { - log.Error("failed to trace tx", "err", err, "tx", tx.Hash()) + log.Info("failed to trace tx", "err", err, "tx", tx.Hash()) continue } @@ -175,11 +173,14 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON } case h := <-headers: hashes = []common.Hash{h.Hash()} - case <-headersSub.Err(): + case err := <-headersSub.Err(): + log.Error("HeaderSub error", "error", err) return - case <-reorgSub.Err(): + case err := <-reorgSub.Err(): + log.Error("ReorgSub error", "error", err) return case <-notifier.Closed(): + log.Error("Nofitier closed Error") return } @@ -192,19 +193,20 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON marshalBlock, err := RPCMarshalBlock(block, true, true, api.sys.backend.ChainConfig()) if err != nil { + log.Error("failed to marshal block", "err", err, "block", block.Number()) continue } trace, err := traceBlock(block, chainConfig, api.sys.chain, tracerOpts) if err != nil { - log.Error("failed to trace block", "err", err, "block", block.Number()) + log.Info("failed to trace block", "err", err, "block", block.Number()) continue } marshalBlock["trace"] = trace - marshalReceipts := make(map[common.Hash]map[string]interface{}) receipts, err := api.sys.backend.GetReceipts(ctx, hash) if err != nil { + log.Error("failed to get receipts for block", "err", err, "hash ", hash, "block", block.Number()) continue } for index, receipt := range receipts { @@ -239,12 +241,6 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON return rpcSub, nil } -var ( - txTraceLocksMu sync.RWMutex - txTraceLocks = make(map[common.Hash]chan struct{}) - txTraceLocksTimeout = 1 * time.Second -) - // traceTx traces a transaction with the given contexts. func traceTx(message *core.Message, txCtx *tracers.Context, vmctx vm.BlockContext, chainConfig *params.ChainConfig, statedb *state.StateDB, tracerOpts blocknative.TracerOpts) (*blocknative.Trace, error) { tracer, err := blocknative.NewTracerWithOpts(tracerOpts) @@ -252,7 +248,7 @@ func traceTx(message *core.Message, txCtx *tracers.Context, vmctx vm.BlockContex return nil, err } txContext := core.NewEVMTxContext(message) - vmenv := vm.NewEVM(vmctx, txContext, statedb, chainConfig, vm.Config{Tracer: tracer, NoBaseFee: true}) + vmenv := vm.NewEVM(vmctx, txContext, statedb, chainConfig, vm.Config{Tracer: tracer}) statedb.SetTxContext(txCtx.TxHash, txCtx.TxIndex) if _, err = core.ApplyMessage(vmenv, message, new(core.GasPool).AddGas(message.GasLimit)); err != nil { @@ -288,8 +284,9 @@ func traceBlock(block *types.Block, chainConfig *params.ChainConfig, chain *core ) for i, tx := range txs { - msg, err := core.TransactionToMessage(tx, signer, block.BaseFee()) + msg, err := core.TransactionToMessage(tx, signer, blockCtx.BaseFee) if err != nil { + log.Error("failed to trace block in transaction to message", "err", err, "tx", tx.Hash()) return nil, err } txCtx := &tracers.Context{ @@ -300,6 +297,7 @@ func traceBlock(block *types.Block, chainConfig *params.ChainConfig, chain *core } results[i], err = traceTx(msg, txCtx, blockCtx, chainConfig, statedb, tracerOpts) if err != nil { + log.Error("failed to trace block in transaction", "err", err, "tx", tx.Hash()) return nil, err } statedb.Finalise(is158)