Skip to content

Commit

Permalink
[Feature] Reorg feed (#43)
Browse files Browse the repository at this point in the history
* Add trace results to NewFullBlocksWithPeers feed

This commit runs the Go Call Tracer during the block import phase,
making those results available to the NewFullBlocksWithPeers feed.

The trace results are included with the transaction receipts. Even
transactions which are simple ETH transfers will have a trace
included.

The Go Call Tracer had to be moved from the core/tracers/ package to
core/tracers/custom/ to avoid import loops, but its functionality
has not changed.

* Add reorg feed

This adds a reorg feed, capturing reorgs in the block processor
and making them available to a reorgFeed subscription.

The response object include:

* common ancestor hash
* common ancestor number
* added block hashes
* removed block hashes

* Add json formatting

Co-authored-by: Austin Roberts <austin.roberts@openrelay.xyz>
  • Loading branch information
dmarzzz and Austin Roberts authored Feb 1, 2022
1 parent b6a51d7 commit 2c990ef
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 74 deletions.
88 changes: 57 additions & 31 deletions core/revert_cache.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,70 @@
package core

import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/accounts/abi"
lru "github.com/hashicorp/golang-lru"
"fmt"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
lru "github.com/hashicorp/golang-lru"
)

var (
revertCache *lru.Cache
reorgFeed event.Feed
revertCache *lru.Cache
reorgFeed event.Feed
traceCache *lru.Cache
)

func CacheRevertReason(h, blockHash common.Hash, reason []byte) {
if revertCache == nil { revertCache, _ = lru.New(10000) }
if reason != nil {
key := [64]byte{}
copy(key[:32], blockHash[:])
copy(key[32:], h[:])
if reasonString, err := abi.UnpackRevert(reason); err == nil {
revertCache.Add(key, reasonString)
} else {
revertCache.Add(key, fmt.Sprintf("%#x", reason))
}
}
if revertCache == nil {
revertCache, _ = lru.New(10000)
}
if reason != nil {
key := [64]byte{}
copy(key[:32], blockHash[:])
copy(key[32:], h[:])
if reasonString, err := abi.UnpackRevert(reason); err == nil {
revertCache.Add(key, reasonString)
} else {
revertCache.Add(key, fmt.Sprintf("%#x", reason))
}
}
}

func GetRevertReason(h, blockHash common.Hash) (string, bool) {
if revertCache == nil { revertCache, _ = lru.New(10000) }
key := [64]byte{}
copy(key[:32], blockHash[:])
copy(key[32:], h[:])
if v, ok := revertCache.Get(key); ok {
return v.(string), true
}
return "", false
if revertCache == nil {
revertCache, _ = lru.New(10000)
}
key := [64]byte{}
copy(key[:32], blockHash[:])
copy(key[32:], h[:])
if v, ok := revertCache.Get(key); ok {
return v.(string), true
}
return "", false
}
func CacheTrace(h, blockHash common.Hash, traceResult interface{}) {
if traceCache == nil {
traceCache, _ = lru.New(10000)
}
key := [64]byte{}
copy(key[:32], blockHash[:])
copy(key[32:], h[:])
traceCache.Add(key, traceResult)
}

func GetTrace(h, blockHash common.Hash) (interface{}, bool) {
if traceCache == nil {
traceCache, _ = lru.New(10000)
}
key := [64]byte{}
copy(key[:32], blockHash[:])
copy(key[32:], h[:])
return traceCache.Get(key)
}

type Reorg struct {
Common common.Hash `json:"common"`
Number hexutil.Uint64 `json:"number"`
Expand All @@ -48,10 +74,10 @@ type Reorg struct {

func sendReorg(commonAncestor *types.Block, removed, added types.Blocks) {
reorg := &Reorg{
Common: commonAncestor.Hash(),
Number: hexutil.Uint64(commonAncestor.NumberU64()),
Common: commonAncestor.Hash(),
Number: hexutil.Uint64(commonAncestor.NumberU64()),
Removed: make([]common.Hash, len(removed)),
Added: make([]common.Hash, len(added)),
Added: make([]common.Hash, len(added)),
}
for i, block := range removed {
reorg.Removed[i] = block.Hash()
Expand Down
12 changes: 12 additions & 0 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/tracers/custom"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/log"
)

// StateProcessor is a basic Processor, which takes care of transitioning
Expand Down Expand Up @@ -71,6 +73,11 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
misc.ApplyDAOHardFork(statedb)
}
blockContext := NewEVMBlockContext(header, p.bc, nil)
oldDebug, oldTracer := cfg.Debug, cfg.Tracer
defer func() { cfg.Debug, cfg.Tracer = oldDebug, oldTracer }()
tracer := custom.NewCallTracer(statedb)
cfg.Debug = true
cfg.Tracer = tracer
vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
Expand All @@ -85,6 +92,11 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
if result, err := tracer.GetResult(); err == nil {
CacheTrace(tx.Hash(), blockHash, result)
} else {
log.Warn("Failed to get result from racer", "block", blockHash, "num", blockNumber, "tx", tx.Hash())
}
}
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles())
Expand Down
118 changes: 78 additions & 40 deletions eth/filters/peers_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,57 @@ import (

"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"

// "github.com/ethereum/go-ethereum/node"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
lru "github.com/hashicorp/golang-lru"
"sync"
"time"
)

var (
blockPeerMap *lru.Cache
txPeerMap *lru.Cache
tsMap *lru.Cache
peerIDMap *sync.Map
txPeerMap *lru.Cache
tsMap *lru.Cache
peerIDMap *sync.Map
)

type peerInfo struct {
Enode string `json:"enode"`
ID string `json:"id"`
ID string `json:"id"`
}


// SetBlockPeer is called when a block is received from a peer to track which
// peer was the first to provide a given block
func SetBlockPeer(hash common.Hash, peer string) {
log.Debug("Recording block peer", "hash", hash, "peer", peer)
if blockPeerMap == nil { blockPeerMap, _ = lru.New(250) }
if tsMap == nil { tsMap, _ = lru.New(100000) }
if blockPeerMap == nil {
blockPeerMap, _ = lru.New(250)
}
if tsMap == nil {
tsMap, _ = lru.New(100000)
}
if _, ok := blockPeerMap.Get(hash); !ok {
blockPeerMap.Add(hash, peer)
tsMap.Add(hash, time.Now().UnixNano())
}
}


// SetTxPeer is called when a transaction is received from a peer to track
// which peer was the first to provide a given transaction
func SetTxPeer(hash common.Hash, peer string) {
if txPeerMap == nil { txPeerMap, _ = lru.New(100000) }
if tsMap == nil { tsMap, _ = lru.New(100000) }
if txPeerMap == nil {
txPeerMap, _ = lru.New(100000)
}
if tsMap == nil {
tsMap, _ = lru.New(100000)
}
if _, ok := txPeerMap.Get(hash); !ok {
txPeerMap.Add(hash, peer)
tsMap.Add(hash, time.Now().UnixNano())
Expand Down Expand Up @@ -98,34 +106,43 @@ func SubscribePeerIDs(srv *p2p.Server) {
// messages we will get later will only include the truncated peer id, so the
// full id and enode must be tracked based on connect / drop messages.
func setPeerID(peerid, enode string) {
if peerIDMap == nil { peerIDMap = &sync.Map{} }
if peerIDMap == nil {
peerIDMap = &sync.Map{}
}
if _, ok := peerIDMap.Load(peerid); !ok {
peerIDMap.Store(peerid, peerInfo{ID: peerid, Enode: enode})
}
}

// dropPeerID cleans up records when a peer drops
func dropPeerID(peerid string) {
if peerIDMap == nil { return }
if peerIDMap == nil {
return
}
peerIDMap.Delete(peerid)
}


// withPeer is a generic wrapper for different types of values distributed with
// peer information.
type withPeer struct {
Value interface{} `json:"value"`
Peer interface{} `json:"peer"`
Time int64 `json:"ts"`
Value interface{} `json:"value"`
Peer interface{} `json:"peer"`
Time int64 `json:"ts"`
P2PTime interface{} `json:"p2pts,omitempty"`
}

// NewHeadsWithPeers send a notification each time a new (header) block is
// appended to the chain, and includes the peer that first provided the block
func (api *PublicFilterAPI) NewHeadsWithPeers(ctx context.Context) (*rpc.Subscription, error) {
if blockPeerMap == nil { blockPeerMap, _ = lru.New(250) }
if peerIDMap == nil { peerIDMap = &sync.Map{} }
if tsMap == nil { tsMap, _ = lru.New(100000) }
if blockPeerMap == nil {
blockPeerMap, _ = lru.New(250)
}
if peerIDMap == nil {
peerIDMap = &sync.Map{}
}
if tsMap == nil {
tsMap, _ = lru.New(100000)
}
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand All @@ -144,7 +161,7 @@ func (api *PublicFilterAPI) NewHeadsWithPeers(ctx context.Context) (*rpc.Subscri
p2pts, _ := tsMap.Get(h.Hash())
peer, _ := peerIDMap.Load(peerid)
log.Debug("NewHeadsWithPeers", "hash", h.Hash(), "peer", peerid, "peer", peer)
notifier.Notify(rpcSub.ID, withPeer{Value: h, Peer: peer, Time: time.Now().UnixNano(), P2PTime: p2pts} )
notifier.Notify(rpcSub.ID, withPeer{Value: h, Peer: peer, Time: time.Now().UnixNano(), P2PTime: p2pts})
case <-rpcSub.Err():
headersSub.Unsubscribe()
return
Expand All @@ -162,9 +179,15 @@ func (api *PublicFilterAPI) NewHeadsWithPeers(ctx context.Context) (*rpc.Subscri
// transactions and receipts is appended to the chain, and includes the peer
// that first provided the block
func (api *PublicFilterAPI) NewFullBlocksWithPeers(ctx context.Context) (*rpc.Subscription, error) {
if blockPeerMap == nil { blockPeerMap, _ = lru.New(250) }
if peerIDMap == nil { peerIDMap = &sync.Map{} }
if tsMap == nil { tsMap, _ = lru.New(100000) }
if blockPeerMap == nil {
blockPeerMap, _ = lru.New(250)
}
if peerIDMap == nil {
peerIDMap = &sync.Map{}
}
if tsMap == nil {
tsMap, _ = lru.New(100000)
}
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand All @@ -186,7 +209,7 @@ func (api *PublicFilterAPI) NewFullBlocksWithPeers(ctx context.Context) (*rpc.Su
case r := <-reorgs:
// Reverse the added blocks in the reorgs, excluding the latest block
// as it will be emitted on the newHeads channels.
hashes = make([]common.Hash, 0, len(r.Added) - 1)
hashes = make([]common.Hash, 0, len(r.Added)-1)
for i := len(r.Added) - 1; i > 0; i-- {
hashes = append(hashes, r.Added[i])
}
Expand All @@ -203,9 +226,13 @@ func (api *PublicFilterAPI) NewFullBlocksWithPeers(ctx context.Context) (*rpc.Su
peerid, _ := blockPeerMap.Get(hash)

block, err := api.backend.BlockByHash(ctx, hash)
if err != nil { continue }
if err != nil {
continue
}
marshalBlock, err := ethapi.RPCMarshalBlock(block, true, true, api.backend.ChainConfig())
if err != nil { continue }
if err != nil {
continue
}

marshalReceipts := make(map[common.Hash]map[string]interface{})
receipts, err := api.backend.GetReceipts(ctx, hash)
Expand All @@ -232,15 +259,17 @@ func (api *PublicFilterAPI) NewFullBlocksWithPeers(ctx context.Context) (*rpc.Su
if reason, ok := core.GetRevertReason(receipt.TxHash, hash); ok {
fields["revertReason"] = reason
}
if trace, ok := core.GetTrace(receipt.TxHash, hash); ok {
fields["callTrace"] = trace
}
marshalReceipts[receipt.TxHash] = fields
}
marshalBlock["receipts"] = marshalReceipts


p2pts, _ := tsMap.Get(hash)
peer, _ := peerIDMap.Load(peerid)
log.Debug("NewFullBlocksWithPeers", "hash", hash, "peer", peerid, "peer", peer)
notifier.Notify(rpcSub.ID, withPeer{Value: marshalBlock, Peer: peer, Time: time.Now().UnixNano(), P2PTime: p2pts} )
notifier.Notify(rpcSub.ID, withPeer{Value: marshalBlock, Peer: peer, Time: time.Now().UnixNano(), P2PTime: p2pts})
}
}
}()
Expand All @@ -252,9 +281,15 @@ func (api *PublicFilterAPI) NewFullBlocksWithPeers(ctx context.Context) (*rpc.Su
// each time a transaction enters the transaction pool, and includes the peer
// that first provided the transaction
func (api *PublicFilterAPI) NewPendingTransactionsWithPeers(ctx context.Context) (*rpc.Subscription, error) {
if txPeerMap == nil { txPeerMap, _ = lru.New(100000) }
if peerIDMap == nil { peerIDMap = &sync.Map{} }
if tsMap == nil { tsMap, _ = lru.New(100000) }
if txPeerMap == nil {
txPeerMap, _ = lru.New(100000)
}
if peerIDMap == nil {
peerIDMap = &sync.Map{}
}
if tsMap == nil {
tsMap, _ = lru.New(100000)
}
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand Down Expand Up @@ -288,12 +323,15 @@ func (api *PublicFilterAPI) NewPendingTransactionsWithPeers(ctx context.Context)
return rpcSub, nil
}


// NewTransactionReceipts creates a subscription that is triggered for each
// receipt in a newly confirmed block.
func (api *PublicFilterAPI) NewTransactionReceipts(ctx context.Context) (*rpc.Subscription, error) {
if blockPeerMap == nil { blockPeerMap, _ = lru.New(250) }
if peerIDMap == nil { peerIDMap = &sync.Map{} }
if blockPeerMap == nil {
blockPeerMap, _ = lru.New(250)
}
if peerIDMap == nil {
peerIDMap = &sync.Map{}
}
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand All @@ -310,7 +348,7 @@ func (api *PublicFilterAPI) NewTransactionReceipts(ctx context.Context) (*rpc.Su
case h := <-headers:
receipts, _ := api.backend.GetReceipts(ctx, h.Hash())
for _, receipt := range receipts {
notifier.Notify(rpcSub.ID, receipt )
notifier.Notify(rpcSub.ID, receipt)
}
case <-rpcSub.Err():
headersSub.Unsubscribe()
Expand Down
Loading

0 comments on commit 2c990ef

Please sign in to comment.