diff --git a/bn/prometheus/metrics.go b/bn/prometheus/metrics.go new file mode 100644 index 000000000000..a53263c2ce97 --- /dev/null +++ b/bn/prometheus/metrics.go @@ -0,0 +1,33 @@ +package prometheus + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +type metrics struct { + registry *prometheus.Registry + gatherers prometheus.Gatherers +} + +var Metrics metrics + +func init() { + reg := prometheus.NewRegistry() + Metrics = metrics{registry: reg, gatherers: prometheus.Gatherers{reg}} +} + +func (m *metrics) Register(collector prometheus.Collector) error { + return m.registry.Register(collector) +} + +func (m *metrics) Handler() http.Handler { + return promhttp.HandlerFor( + m.gatherers[0], + promhttp.HandlerOpts{ + EnableOpenMetrics: true, + }, + ) +} diff --git a/core/vm/interface.go b/core/vm/interface.go index 4aef4271a78e..a72d1688d041 100644 --- a/core/vm/interface.go +++ b/core/vm/interface.go @@ -82,6 +82,7 @@ type StateDB interface { // BlockNative additions Logs() []*types.Log + GetLogs(hash common.Hash, blockNumber uint64, blockHash common.Hash) []*types.Log IntermediateRoot(bool) common.Hash } diff --git a/eth/filters/dropped_tx_subscription.go b/eth/filters/dropped_tx_subscription.go index aba49f130d8c..3e2d68a480ab 100644 --- a/eth/filters/dropped_tx_subscription.go +++ b/eth/filters/dropped_tx_subscription.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/tracers/blocknative" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" lru "github.com/hashicorp/golang-lru" ) @@ -153,9 +154,13 @@ func (api *FilterAPI) DroppedTransactions(ctx context.Context) (*rpc.Subscriptio dropped := make(chan core.DropTxsEvent) droppedSub := api.sys.backend.SubscribeDropTxsEvent(dropped) + metricsDroppedTxsNew.Inc(1) + defer metricsDroppedTxsEnd.Inc(1) + for { select { case d := <-dropped: + metricsDroppedTxsReceived.Inc(int64(len(d.Txs))) for _, tx := range d.Txs { notification := &dropNotification{ Tx: newRPCPendingTransaction(tx), @@ -167,7 +172,11 @@ func (api *FilterAPI) DroppedTransactions(ctx context.Context) (*rpc.Subscriptio peerid, _ := txPeerMap.Get(tx.Hash()) notification.Peer, _ = peerIDMap.Load(peerid) } - notifier.Notify(rpcSub.ID, notification) + metricsDroppedTxsSent.Inc(1) + if err := notifier.Notify(rpcSub.ID, notification); err != nil { + log.Error("dropped_txs_stream: failed to notify", "err", err) + return + } } case <-rpcSub.Err(): droppedSub.Unsubscribe() diff --git a/eth/filters/metrics.go b/eth/filters/metrics.go new file mode 100644 index 000000000000..47f37da34753 --- /dev/null +++ b/eth/filters/metrics.go @@ -0,0 +1,30 @@ +package filters + +import ( + "github.com/ethereum/go-ethereum/metrics" +) + +var ( + metricsPendingTxsNew = metrics.NewRegisteredCounter("stream/pending_txs/new", nil) + metricsPendingTxsEnd = metrics.NewRegisteredCounter("stream/pending_txs/end", nil) + metricsPendingTxsReceived = metrics.NewRegisteredCounter("stream/pending_txs/received", nil) + metricsPendingTxsGasTooLow = metrics.NewRegisteredCounter("stream/pending_txs/gas_too_low", nil) + metricsPendingTxsTraceSuccess = metrics.NewRegisteredCounter("stream/pending_txs/trace_success", nil) + metricsPendingTxsTraceFailed = metrics.NewRegisteredCounter("stream/pending_txs/trace_failed", nil) + metricsPendingTxsSent = metrics.NewRegisteredCounter("stream/pending_txs/sent", nil) + + metricsBlocksNew = metrics.NewRegisteredCounter("stream/blocks/new", nil) + metricsBlocksEnd = metrics.NewRegisteredCounter("stream/blocks/end", nil) + metricsBlocksReceived = metrics.NewRegisteredCounter("stream/blocks/received", nil) + metricsBlocksTraceSuccess = metrics.NewRegisteredCounter("stream/blocks/trace_success", nil) + metricsBlocksTraceFailed = metrics.NewRegisteredCounter("stream/blocks/trace_failed", nil) + metricsBlocksSent = metrics.NewRegisteredCounter("stream/blocks/sent", nil) + + metricsDroppedTxsNew = metrics.NewRegisteredCounter("stream/dropped_txs/new", nil) + metricsDroppedTxsEnd = metrics.NewRegisteredCounter("stream/dropped_txs/end", nil) + metricsDroppedTxsReceived = metrics.NewRegisteredCounter("stream/dropped_txs/received", nil) + metricsDroppedTxsSent = metrics.NewRegisteredCounter("stream/dropped_txs/sent", nil) + + metricsTracePendingTxTimer = metrics.NewRegisteredHistogram("stream/pending_txs/trace_duration", nil, metrics.NewExpDecaySample(1028, 0.015)) + metricsTraceBlockTimer = metrics.NewRegisteredHistogram("stream/blocks/trace_duration", nil, metrics.NewExpDecaySample(1028, 0.015)) +) diff --git a/eth/filters/trace_api.go b/eth/filters/trace_api.go index 57a44b23948f..1b09eaa454fe 100644 --- a/eth/filters/trace_api.go +++ b/eth/filters/trace_api.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -58,6 +59,17 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace msg *core.Message ) + metricsPendingTxsNew.Inc(1) + defer metricsPendingTxsEnd.Inc(1) + + // Recover from any panics. Should be the last deferred call so it runs + // first. + defer func() { + if r := recover(); r != nil { + log.Error("pending_txs_stream panic:", r) + } + }() + for { select { case txs := <-txs: @@ -85,6 +97,8 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace header.ExcessBlobGas = &ex } + metricsPendingTxsReceived.Inc(int64(len(txs))) + for _, tx := range txs { rpcTx := newRPCPendingTransaction(tx) if rpcTx == nil { @@ -124,14 +138,23 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace msg.GasFeeCap = common.Big0 // skip the check of ErrFeeCapTooLow msg.GasTipCap = common.Big0 // skip the check of ErrFeeCapTooLow + startTime := time.Now() + traceCtx.TxHash = tx.Hash tx.Trace, err = traceTx(msg, traceCtx, blockCtx, chainConfig, sDB.Copy(), tracerOpts) if err != nil { log.Info("failed to trace tx", "err", err, "tx", tx.Hash) + metricsPendingTxsTraceFailed.Inc(1) + continue } + + metricsTracePendingTxTimer.Update(time.Since(startTime).Milliseconds()) + metricsPendingTxsTraceSuccess.Inc(1) + } notifier.Notify(rpcSub.ID, tracedTxs) + metricsPendingTxsSent.Inc(int64(len(tracedTxs))) case <-rpcSub.Err(): return @@ -167,6 +190,17 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON return } + metricsBlocksNew.Inc(1) + defer metricsBlocksEnd.Inc(1) + + // Recover from any panics. Should be the last deferred call so it runs + // first. + defer func() { + if r := recover(); r != nil { + log.Error("block_stream panic:", r) + } + }() + var hashes []common.Hash for { select { @@ -187,6 +221,7 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON return } + metricsBlocksReceived.Inc(int64(len(hashes))) for _, hash := range hashes { block, err := api.sys.backend.BlockByHash(ctx, hash) if err != nil { @@ -203,7 +238,9 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON trace, err := traceBlock(block, chainConfig, api.sys.chain, tracerOpts) if err != nil { log.Info("failure in block trace", "err", err, "hash", hash, "block", block.Number()) + metricsBlocksTraceFailed.Inc(1) } + metricsBlocksTraceSuccess.Inc(1) marshalBlock["trace"] = trace marshalReceipts := make(map[common.Hash]map[string]interface{}) @@ -237,6 +274,7 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON marshalBlock["receipts"] = marshalReceipts notifier.Notify(rpcSub.ID, marshalBlock) + metricsBlocksSent.Inc(1) } } }() @@ -268,6 +306,7 @@ func traceTx(message *core.Message, txCtx *tracers.Context, vmctx vm.BlockContex func traceBlockTx(message *core.Message, txCtx *tracers.Context, vmctx vm.BlockContext, chainConfig *params.ChainConfig, statedb *state.StateDB, tracerOpts blocknative.TracerOpts) (*core.ExecutionResult, *blocknative.Trace, error) { tracerOpts.DisableBlockContext = false + tracerOpts.PerHashLogs = true tracer, err := blocknative.NewTracerWithOpts(tracerOpts) if err != nil { return nil, nil, err @@ -275,6 +314,7 @@ func traceBlockTx(message *core.Message, txCtx *tracers.Context, vmctx vm.BlockC vmenv := vm.NewEVM(vmctx, core.NewEVMTxContext(message), statedb, chainConfig, vm.Config{Tracer: tracer, NoBaseFee: false}) statedb.SetTxContext(txCtx.TxHash, txCtx.TxIndex) + tracer.SetTxContext(txCtx.TxHash, txCtx.TxIndex) result, err := core.ApplyMessage(vmenv, message, new(core.GasPool).AddGas(message.GasLimit)) if err != nil { @@ -310,6 +350,7 @@ func traceBlock(block *types.Block, chainConfig *params.ChainConfig, chain *core results2 = make([]*core.ExecutionResult, len(txs)) ) + startTime := time.Now() for i, tx := range txs { msg, err := core.TransactionToMessage(tx, signer, blockCtx.BaseFee) if err != nil { @@ -341,6 +382,7 @@ func traceBlock(block *types.Block, chainConfig *params.ChainConfig, chain *core } statedb.Finalise(is158) } + metricsTraceBlockTimer.Update(time.Since(startTime).Milliseconds()) return results, nil } diff --git a/eth/tracers/blocknative/blocknative.go b/eth/tracers/blocknative/blocknative.go index c67fe0ba8894..15467ba5e3f9 100644 --- a/eth/tracers/blocknative/blocknative.go +++ b/eth/tracers/blocknative/blocknative.go @@ -14,6 +14,7 @@ import ( // resulting Trace object directly. type Tracer interface { vm.EVMLogger + SetTxContext(thash common.Hash, ti int) GetTrace() (*Trace, error) GetResult() (json.RawMessage, error) Stop(err error) @@ -21,7 +22,10 @@ type Tracer interface { // TracerOpts configure the tracer to save or ignore various aspects of a transaction execution. type TracerOpts struct { - Logs bool `json:"logs"` + Logs bool `json:"logs"` + // Get per tx hash logs + PerHashLogs bool `json:"per_hash_logs"` + Decode bool `json:"decode"` BalanceChanges bool `json:"balanceChanges"` diff --git a/eth/tracers/blocknative/decoder/asset.go b/eth/tracers/blocknative/decoder/asset.go index 8f9f043ea427..0d864b565176 100644 --- a/eth/tracers/blocknative/decoder/asset.go +++ b/eth/tracers/blocknative/decoder/asset.go @@ -70,8 +70,11 @@ func decodeERC721Metadata(evmCall evmCallFn, addr common.Address, tokenID *big.I if metadata.Symbol, err = evmCallMethodSymbol(evmCall, addr); err != nil { log.Trace("failed to decode ERC721 symbol", "err", err) } - if metadata.URI, err = evmCallMethodTokenURI(evmCall, addr, tokenID); err != nil { - log.Trace("failed to decode ERC721 tokenURI", "err", err) + + if tokenID != nil { + if metadata.URI, err = evmCallMethodTokenURI(evmCall, addr, tokenID); err != nil { + log.Trace("failed to decode ERC721 tokenURI", "err", err) + } } return metadata @@ -82,8 +85,10 @@ func decodeERC1155Metadata(evmCall evmCallFn, addr common.Address, tokenID *big. var err error metadata := AssetMetadata{Type: AssetTypeERC1155} - if metadata.URI, err = evmCallMethodURI(evmCall, addr, tokenID); err != nil { - log.Trace("failed to decode ERC1155 URI", "err", err) + if tokenID != nil { + if metadata.URI, err = evmCallMethodURI(evmCall, addr, tokenID); err != nil { + log.Trace("failed to decode ERC1155 URI", "err", err) + } } return metadata diff --git a/eth/tracers/blocknative/decoder/balances.go b/eth/tracers/blocknative/decoder/balances.go index a153487e3527..02a00a347536 100644 --- a/eth/tracers/blocknative/decoder/balances.go +++ b/eth/tracers/blocknative/decoder/balances.go @@ -85,6 +85,12 @@ func (changeMap balanceChangeByOwnerByAsset) addAssetTransfer(asset *Asset, from // accountAssetChange adds a single side change of an asset transfer to the balanceChangeByOwnerByAsset map. func (changeMap balanceChangeByOwnerByAsset) accountAssetChange(owner common.Address, counterparty common.Address, asset *Asset, delta *Amount) { + // We can get Transfer events that actually sent 0. We show these in the + // decoding and logs but don't add them to the balance changes. + if delta.ToInt().Sign() == 0 { + return + } + // If this is the first time we've seen this owner then create a new // balanceByAsset map. if _, ok := changeMap[owner]; !ok { diff --git a/eth/tracers/blocknative/tracer.go b/eth/tracers/blocknative/tracer.go index 6e4658d8b184..69c53e0184ee 100644 --- a/eth/tracers/blocknative/tracer.go +++ b/eth/tracers/blocknative/tracer.go @@ -27,6 +27,9 @@ type tracer struct { evm *vm.EVM decoder *decoder.Decoder + thash common.Hash // transaction has + txIndex int // transaction index + trace Trace startTime time.Time callStack []CallFrame @@ -63,7 +66,11 @@ func NewTracerWithOpts(opts TracerOpts) (Tracer, error) { } return &t, nil +} +func (t *tracer) SetTxContext(thash common.Hash, ti int) { + t.thash = thash + t.txIndex = ti } // SetStateRoot implements core.stateRootSetter and stores the given root in the @@ -128,24 +135,34 @@ func (t *tracer) CaptureEnd(output []byte, gasUsed uint64, err error) { log.Error("failed to finalize call frame", "err", err) } + // Add gas payments to balance changes iff the tx succeeded. + if err == nil && t.opts.Decode { + t.decoder.CaptureGas(t.evm.TxContext.Origin, t.evm.Context.Coinbase, gasUsed, t.evm.TxContext.GasPrice, t.evm.Context.BaseFee) + } + // If the user wants the logs, grab them from the state if t.opts.Logs { - for _, stateLog := range t.evm.StateDB.Logs() { - t.trace.Logs = append(t.trace.Logs, CallLog{ - Address: stateLog.Address, - Data: stateLog.Data, - Topics: stateLog.Topics, - }) + if t.opts.PerHashLogs { + for _, stateLog := range t.evm.StateDB.GetLogs(t.thash, 0, common.Hash{}) { + t.trace.Logs = append(t.trace.Logs, CallLog{ + Address: stateLog.Address, + Data: stateLog.Data, + Topics: stateLog.Topics, + }) + } + } else { + for _, stateLog := range t.evm.StateDB.Logs() { + t.trace.Logs = append(t.trace.Logs, CallLog{ + Address: stateLog.Address, + Data: stateLog.Data, + Topics: stateLog.Topics, + }) + } } } - // Add gas payments to balance changes - if t.opts.Decode { - t.decoder.CaptureGas(t.evm.TxContext.Origin, t.evm.Context.Coinbase, gasUsed, t.evm.TxContext.GasPrice, t.evm.Context.BaseFee) - } - // Add total time duration for this trace request - t.trace.Time = time.Now().Sub(t.startTime).Nanoseconds() + t.trace.Time = time.Since(t.startTime).Nanoseconds() } // CaptureEnter is called before any new sub-call starts. diff --git a/go.mod b/go.mod index cb64e829cc8a..16ea39b14ce5 100644 --- a/go.mod +++ b/go.mod @@ -55,6 +55,7 @@ require ( github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 github.com/olekukonko/tablewriter v0.0.5 github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 + github.com/prometheus/client_golang v1.16.0 github.com/protolambda/bls12-381-util v0.0.0-20220416220906-d8552aa452c7 github.com/rs/cors v1.7.0 github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible @@ -129,7 +130,6 @@ require ( github.com/opentracing/opentracing-go v1.1.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect