Skip to content

Commit

Permalink
Merge pull request #172 from blocknative/TS_restore
Browse files Browse the repository at this point in the history
Restore lost commits
  • Loading branch information
lukanus authored Apr 2, 2024
2 parents 70684e9 + 971554f commit 755dd73
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 19 deletions.
33 changes: 33 additions & 0 deletions bn/prometheus/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package prometheus

import (
"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type metrics struct {
registry *prometheus.Registry
gatherers prometheus.Gatherers
}

var Metrics metrics

func init() {
reg := prometheus.NewRegistry()
Metrics = metrics{registry: reg, gatherers: prometheus.Gatherers{reg}}
}

func (m *metrics) Register(collector prometheus.Collector) error {
return m.registry.Register(collector)
}

func (m *metrics) Handler() http.Handler {
return promhttp.HandlerFor(
m.gatherers[0],
promhttp.HandlerOpts{
EnableOpenMetrics: true,
},
)
}
1 change: 1 addition & 0 deletions core/vm/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type StateDB interface {

// BlockNative additions
Logs() []*types.Log
GetLogs(hash common.Hash, blockNumber uint64, blockHash common.Hash) []*types.Log
IntermediateRoot(bool) common.Hash
}

Expand Down
11 changes: 10 additions & 1 deletion eth/filters/dropped_tx_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/tracers/blocknative"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
lru "github.com/hashicorp/golang-lru"
)
Expand Down Expand Up @@ -153,9 +154,13 @@ func (api *FilterAPI) DroppedTransactions(ctx context.Context) (*rpc.Subscriptio
dropped := make(chan core.DropTxsEvent)
droppedSub := api.sys.backend.SubscribeDropTxsEvent(dropped)

metricsDroppedTxsNew.Inc(1)
defer metricsDroppedTxsEnd.Inc(1)

for {
select {
case d := <-dropped:
metricsDroppedTxsReceived.Inc(int64(len(d.Txs)))
for _, tx := range d.Txs {
notification := &dropNotification{
Tx: newRPCPendingTransaction(tx),
Expand All @@ -167,7 +172,11 @@ func (api *FilterAPI) DroppedTransactions(ctx context.Context) (*rpc.Subscriptio
peerid, _ := txPeerMap.Get(tx.Hash())
notification.Peer, _ = peerIDMap.Load(peerid)
}
notifier.Notify(rpcSub.ID, notification)
metricsDroppedTxsSent.Inc(1)
if err := notifier.Notify(rpcSub.ID, notification); err != nil {
log.Error("dropped_txs_stream: failed to notify", "err", err)
return
}
}
case <-rpcSub.Err():
droppedSub.Unsubscribe()
Expand Down
30 changes: 30 additions & 0 deletions eth/filters/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package filters

import (
"github.com/ethereum/go-ethereum/metrics"
)

var (
metricsPendingTxsNew = metrics.NewRegisteredCounter("stream/pending_txs/new", nil)
metricsPendingTxsEnd = metrics.NewRegisteredCounter("stream/pending_txs/end", nil)
metricsPendingTxsReceived = metrics.NewRegisteredCounter("stream/pending_txs/received", nil)
metricsPendingTxsGasTooLow = metrics.NewRegisteredCounter("stream/pending_txs/gas_too_low", nil)
metricsPendingTxsTraceSuccess = metrics.NewRegisteredCounter("stream/pending_txs/trace_success", nil)
metricsPendingTxsTraceFailed = metrics.NewRegisteredCounter("stream/pending_txs/trace_failed", nil)
metricsPendingTxsSent = metrics.NewRegisteredCounter("stream/pending_txs/sent", nil)

metricsBlocksNew = metrics.NewRegisteredCounter("stream/blocks/new", nil)
metricsBlocksEnd = metrics.NewRegisteredCounter("stream/blocks/end", nil)
metricsBlocksReceived = metrics.NewRegisteredCounter("stream/blocks/received", nil)
metricsBlocksTraceSuccess = metrics.NewRegisteredCounter("stream/blocks/trace_success", nil)
metricsBlocksTraceFailed = metrics.NewRegisteredCounter("stream/blocks/trace_failed", nil)
metricsBlocksSent = metrics.NewRegisteredCounter("stream/blocks/sent", nil)

metricsDroppedTxsNew = metrics.NewRegisteredCounter("stream/dropped_txs/new", nil)
metricsDroppedTxsEnd = metrics.NewRegisteredCounter("stream/dropped_txs/end", nil)
metricsDroppedTxsReceived = metrics.NewRegisteredCounter("stream/dropped_txs/received", nil)
metricsDroppedTxsSent = metrics.NewRegisteredCounter("stream/dropped_txs/sent", nil)

metricsTracePendingTxTimer = metrics.NewRegisteredHistogram("stream/pending_txs/trace_duration", nil, metrics.NewExpDecaySample(1028, 0.015))
metricsTraceBlockTimer = metrics.NewRegisteredHistogram("stream/blocks/trace_duration", nil, metrics.NewExpDecaySample(1028, 0.015))
)
42 changes: 42 additions & 0 deletions eth/filters/trace_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -58,6 +59,17 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace
msg *core.Message
)

metricsPendingTxsNew.Inc(1)
defer metricsPendingTxsEnd.Inc(1)

// Recover from any panics. Should be the last deferred call so it runs
// first.
defer func() {
if r := recover(); r != nil {
log.Error("pending_txs_stream panic:", r)
}
}()

for {
select {
case txs := <-txs:
Expand Down Expand Up @@ -85,6 +97,8 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace
header.ExcessBlobGas = &ex
}

metricsPendingTxsReceived.Inc(int64(len(txs)))

for _, tx := range txs {
rpcTx := newRPCPendingTransaction(tx)
if rpcTx == nil {
Expand Down Expand Up @@ -124,14 +138,23 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace
msg.GasFeeCap = common.Big0 // skip the check of ErrFeeCapTooLow
msg.GasTipCap = common.Big0 // skip the check of ErrFeeCapTooLow

startTime := time.Now()

traceCtx.TxHash = tx.Hash
tx.Trace, err = traceTx(msg, traceCtx, blockCtx, chainConfig, sDB.Copy(), tracerOpts)
if err != nil {
log.Info("failed to trace tx", "err", err, "tx", tx.Hash)
metricsPendingTxsTraceFailed.Inc(1)
continue
}

metricsTracePendingTxTimer.Update(time.Since(startTime).Milliseconds())
metricsPendingTxsTraceSuccess.Inc(1)

}

notifier.Notify(rpcSub.ID, tracedTxs)
metricsPendingTxsSent.Inc(int64(len(tracedTxs)))

case <-rpcSub.Err():
return
Expand Down Expand Up @@ -167,6 +190,17 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON
return
}

metricsBlocksNew.Inc(1)
defer metricsBlocksEnd.Inc(1)

// Recover from any panics. Should be the last deferred call so it runs
// first.
defer func() {
if r := recover(); r != nil {
log.Error("block_stream panic:", r)
}
}()

var hashes []common.Hash
for {
select {
Expand All @@ -187,6 +221,7 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON
return
}

metricsBlocksReceived.Inc(int64(len(hashes)))
for _, hash := range hashes {
block, err := api.sys.backend.BlockByHash(ctx, hash)
if err != nil {
Expand All @@ -203,7 +238,9 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON
trace, err := traceBlock(block, chainConfig, api.sys.chain, tracerOpts)
if err != nil {
log.Info("failure in block trace", "err", err, "hash", hash, "block", block.Number())
metricsBlocksTraceFailed.Inc(1)
}
metricsBlocksTraceSuccess.Inc(1)

marshalBlock["trace"] = trace
marshalReceipts := make(map[common.Hash]map[string]interface{})
Expand Down Expand Up @@ -237,6 +274,7 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON
marshalBlock["receipts"] = marshalReceipts

notifier.Notify(rpcSub.ID, marshalBlock)
metricsBlocksSent.Inc(1)
}
}
}()
Expand Down Expand Up @@ -268,13 +306,15 @@ func traceTx(message *core.Message, txCtx *tracers.Context, vmctx vm.BlockContex
func traceBlockTx(message *core.Message, txCtx *tracers.Context, vmctx vm.BlockContext, chainConfig *params.ChainConfig, statedb *state.StateDB, tracerOpts blocknative.TracerOpts) (*core.ExecutionResult, *blocknative.Trace, error) {

tracerOpts.DisableBlockContext = false
tracerOpts.PerHashLogs = true
tracer, err := blocknative.NewTracerWithOpts(tracerOpts)
if err != nil {
return nil, nil, err
}

vmenv := vm.NewEVM(vmctx, core.NewEVMTxContext(message), statedb, chainConfig, vm.Config{Tracer: tracer, NoBaseFee: false})
statedb.SetTxContext(txCtx.TxHash, txCtx.TxIndex)
tracer.SetTxContext(txCtx.TxHash, txCtx.TxIndex)

result, err := core.ApplyMessage(vmenv, message, new(core.GasPool).AddGas(message.GasLimit))
if err != nil {
Expand Down Expand Up @@ -310,6 +350,7 @@ func traceBlock(block *types.Block, chainConfig *params.ChainConfig, chain *core
results2 = make([]*core.ExecutionResult, len(txs))
)

startTime := time.Now()
for i, tx := range txs {
msg, err := core.TransactionToMessage(tx, signer, blockCtx.BaseFee)
if err != nil {
Expand Down Expand Up @@ -341,6 +382,7 @@ func traceBlock(block *types.Block, chainConfig *params.ChainConfig, chain *core
}
statedb.Finalise(is158)
}
metricsTraceBlockTimer.Update(time.Since(startTime).Milliseconds())

return results, nil
}
Expand Down
6 changes: 5 additions & 1 deletion eth/tracers/blocknative/blocknative.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@ import (
// resulting Trace object directly.
type Tracer interface {
vm.EVMLogger
SetTxContext(thash common.Hash, ti int)
GetTrace() (*Trace, error)
GetResult() (json.RawMessage, error)
Stop(err error)
}

// TracerOpts configure the tracer to save or ignore various aspects of a transaction execution.
type TracerOpts struct {
Logs bool `json:"logs"`
Logs bool `json:"logs"`
// Get per tx hash logs
PerHashLogs bool `json:"per_hash_logs"`

Decode bool `json:"decode"`
BalanceChanges bool `json:"balanceChanges"`

Expand Down
13 changes: 9 additions & 4 deletions eth/tracers/blocknative/decoder/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ func decodeERC721Metadata(evmCall evmCallFn, addr common.Address, tokenID *big.I
if metadata.Symbol, err = evmCallMethodSymbol(evmCall, addr); err != nil {
log.Trace("failed to decode ERC721 symbol", "err", err)
}
if metadata.URI, err = evmCallMethodTokenURI(evmCall, addr, tokenID); err != nil {
log.Trace("failed to decode ERC721 tokenURI", "err", err)

if tokenID != nil {
if metadata.URI, err = evmCallMethodTokenURI(evmCall, addr, tokenID); err != nil {
log.Trace("failed to decode ERC721 tokenURI", "err", err)
}
}

return metadata
Expand All @@ -82,8 +85,10 @@ func decodeERC1155Metadata(evmCall evmCallFn, addr common.Address, tokenID *big.
var err error
metadata := AssetMetadata{Type: AssetTypeERC1155}

if metadata.URI, err = evmCallMethodURI(evmCall, addr, tokenID); err != nil {
log.Trace("failed to decode ERC1155 URI", "err", err)
if tokenID != nil {
if metadata.URI, err = evmCallMethodURI(evmCall, addr, tokenID); err != nil {
log.Trace("failed to decode ERC1155 URI", "err", err)
}
}

return metadata
Expand Down
6 changes: 6 additions & 0 deletions eth/tracers/blocknative/decoder/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ func (changeMap balanceChangeByOwnerByAsset) addAssetTransfer(asset *Asset, from

// accountAssetChange adds a single side change of an asset transfer to the balanceChangeByOwnerByAsset map.
func (changeMap balanceChangeByOwnerByAsset) accountAssetChange(owner common.Address, counterparty common.Address, asset *Asset, delta *Amount) {
// We can get Transfer events that actually sent 0. We show these in the
// decoding and logs but don't add them to the balance changes.
if delta.ToInt().Sign() == 0 {
return
}

// If this is the first time we've seen this owner then create a new
// balanceByAsset map.
if _, ok := changeMap[owner]; !ok {
Expand Down
41 changes: 29 additions & 12 deletions eth/tracers/blocknative/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type tracer struct {
evm *vm.EVM
decoder *decoder.Decoder

thash common.Hash // transaction has
txIndex int // transaction index

trace Trace
startTime time.Time
callStack []CallFrame
Expand Down Expand Up @@ -63,7 +66,11 @@ func NewTracerWithOpts(opts TracerOpts) (Tracer, error) {
}

return &t, nil
}

func (t *tracer) SetTxContext(thash common.Hash, ti int) {
t.thash = thash
t.txIndex = ti
}

// SetStateRoot implements core.stateRootSetter and stores the given root in the
Expand Down Expand Up @@ -128,24 +135,34 @@ func (t *tracer) CaptureEnd(output []byte, gasUsed uint64, err error) {
log.Error("failed to finalize call frame", "err", err)
}

// Add gas payments to balance changes iff the tx succeeded.
if err == nil && t.opts.Decode {
t.decoder.CaptureGas(t.evm.TxContext.Origin, t.evm.Context.Coinbase, gasUsed, t.evm.TxContext.GasPrice, t.evm.Context.BaseFee)
}

// If the user wants the logs, grab them from the state
if t.opts.Logs {
for _, stateLog := range t.evm.StateDB.Logs() {
t.trace.Logs = append(t.trace.Logs, CallLog{
Address: stateLog.Address,
Data: stateLog.Data,
Topics: stateLog.Topics,
})
if t.opts.PerHashLogs {
for _, stateLog := range t.evm.StateDB.GetLogs(t.thash, 0, common.Hash{}) {
t.trace.Logs = append(t.trace.Logs, CallLog{
Address: stateLog.Address,
Data: stateLog.Data,
Topics: stateLog.Topics,
})
}
} else {
for _, stateLog := range t.evm.StateDB.Logs() {
t.trace.Logs = append(t.trace.Logs, CallLog{
Address: stateLog.Address,
Data: stateLog.Data,
Topics: stateLog.Topics,
})
}
}
}

// Add gas payments to balance changes
if t.opts.Decode {
t.decoder.CaptureGas(t.evm.TxContext.Origin, t.evm.Context.Coinbase, gasUsed, t.evm.TxContext.GasPrice, t.evm.Context.BaseFee)
}

// Add total time duration for this trace request
t.trace.Time = time.Now().Sub(t.startTime).Nanoseconds()
t.trace.Time = time.Since(t.startTime).Nanoseconds()
}

// CaptureEnter is called before any new sub-call starts.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416
github.com/olekukonko/tablewriter v0.0.5
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7
github.com/prometheus/client_golang v1.16.0
github.com/protolambda/bls12-381-util v0.0.0-20220416220906-d8552aa452c7
github.com/rs/cors v1.7.0
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
Expand Down Expand Up @@ -129,7 +130,6 @@ require (
github.com/opentracing/opentracing-go v1.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
Expand Down

0 comments on commit 755dd73

Please sign in to comment.