Skip to content

Commit

Permalink
eth/filters: avoid block body retrieval when no matching logs (ethere…
Browse files Browse the repository at this point in the history
…um#25199)

Logs stored on disk have minimal information. Contextual information such as block
number, index of log in block, index of transaction in block are filled in upon request.
We can fill in all these fields only having the block header and list of receipts.
But determining the transaction hash of a log requires the block body.

The goal of this PR is postponing this retrieval until we are sure we the transaction hash.
It happens often that the header bloom filter signals there might be matches in a block,
but after actually checking them reveals the logs do not match. We want to avoid fetching
the body in this case.

Note that this changes the semantics of Backend.GetLogs. Downstream callers of
GetLogs now assume log context fields have not been derived, and need to call
DeriveFields on the logs if necessary.
  • Loading branch information
s1na authored Feb 13, 2023
1 parent 241cf62 commit 2def62b
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 97 deletions.
7 changes: 7 additions & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,13 @@ func (fb *filterBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*t
return fb.bc.GetHeaderByHash(hash), nil
}

func (fb *filterBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) {
if body := fb.bc.GetBody(hash); body != nil {
return body, nil
}
return nil, errors.New("block body not found")
}

func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return fb.backend.pendingBlock, fb.backend.pendingReceipts
}
Expand Down
15 changes: 3 additions & 12 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,9 @@ func deriveLogFields(receipts []*receiptLogs, hash common.Hash, number uint64, t
return nil
}

// ReadLogs retrieves the logs for all transactions in a block. The log fields
// are populated with metadata. In case the receipts or the block body
// are not found, a nil is returned.
// ReadLogs retrieves the logs for all transactions in a block. In case
// receipts is not found, a nil is returned.
// Note: ReadLogs does not derive unstored log fields.
func ReadLogs(db ethdb.Reader, hash common.Hash, number uint64, config *params.ChainConfig) [][]*types.Log {
// Retrieve the flattened receipt slice
data := ReadReceiptsRLP(db, hash, number)
Expand All @@ -729,15 +729,6 @@ func ReadLogs(db ethdb.Reader, hash common.Hash, number uint64, config *params.C
return nil
}

body := ReadBody(db, hash, number)
if body == nil {
log.Error("Missing body but have receipt", "hash", hash, "number", number)
return nil
}
if err := deriveLogFields(receipts, hash, number, body.Transactions); err != nil {
log.Error("Failed to derive block receipts fields", "hash", hash, "number", number, "err", err)
return nil
}
logs := make([][]*types.Log, len(receipts))
for i, receipt := range receipts {
logs[i] = receipt.Logs
Expand Down
4 changes: 0 additions & 4 deletions core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,6 @@ func TestReadLogs(t *testing.T) {
t.Fatalf("unexpected number of logs[1] returned, have %d want %d", have, want)
}

// Fill in log fields so we can compare their rlp encoding
if err := types.Receipts(receipts).DeriveFields(params.TestChainConfig, hash, 0, body.Transactions); err != nil {
t.Fatal(err)
}
for i, pr := range receipts {
for j, pl := range pr.Logs {
rlpHave, err := rlp.EncodeToBytes(newFullLogRLP(logs[i][j]))
Expand Down
11 changes: 11 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ func (b *EthAPIBackend) BlockByHash(ctx context.Context, hash common.Hash) (*typ
return b.eth.blockchain.GetBlockByHash(hash), nil
}

// GetBody returns body of a block. It does not resolve special block numbers.
func (b *EthAPIBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) {
if number < 0 || hash == (common.Hash{}) {
return nil, errors.New("invalid arguments; expect hash and no special block numbers")
}
if body := b.eth.blockchain.GetBody(hash); body != nil {
return body, nil
}
return nil, errors.New("block body not found")
}

func (b *EthAPIBackend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error) {
if blockNr, ok := blockNrOrHash.Number(); ok {
return b.BlockByNumber(ctx, blockNr)
Expand Down
68 changes: 30 additions & 38 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
if header == nil {
return nil, errors.New("unknown block")
}
return f.blockLogs(ctx, header, false)
return f.blockLogs(ctx, header)
}
// Short-cut if all we care about is pending logs
if f.begin == rpc.PendingBlockNumber.Int64() {
Expand Down Expand Up @@ -216,7 +216,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
if header == nil || err != nil {
return logs, err
}
found, err := f.blockLogs(ctx, header, true)
found, err := f.checkMatches(ctx, header)
if err != nil {
return logs, err
}
Expand All @@ -241,7 +241,7 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
if header == nil || err != nil {
return logs, err
}
found, err := f.blockLogs(ctx, header, false)
found, err := f.blockLogs(ctx, header)
if err != nil {
return logs, err
}
Expand All @@ -251,46 +251,46 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
}

// blockLogs returns the logs matching the filter criteria within a single block.
func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool) ([]*types.Log, error) {
// Fast track: no filtering criteria
if len(f.addresses) == 0 && len(f.topics) == 0 {
list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
if err != nil {
return nil, err
}
return flatten(list), nil
} else if skipBloom || bloomFilter(header.Bloom, f.addresses, f.topics) {
func (f *Filter) blockLogs(ctx context.Context, header *types.Header) ([]*types.Log, error) {
if bloomFilter(header.Bloom, f.addresses, f.topics) {
return f.checkMatches(ctx, header)
}
return nil, nil
}

// checkMatches checks if the receipts belonging to the given header contain any log events that
// match the filter criteria. This function is called when the bloom filter signals a potential match.
// skipFilter signals all logs of the given block are requested.
func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*types.Log, error) {
logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
hash := header.Hash()
// Logs in cache are partially filled with context data
// such as tx index, block hash, etc.
// Notably tx hash is NOT filled in because it needs
// access to block body data.
cached, err := f.sys.cachedLogElem(ctx, hash, header.Number.Uint64())
if err != nil {
return nil, err
}

unfiltered := flatten(logsList)
logs := filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
if len(logs) > 0 {
// We have matching logs, check if we need to resolve full logs via the light client
if logs[0].TxHash == (common.Hash{}) {
receipts, err := f.sys.backend.GetReceipts(ctx, header.Hash())
if err != nil {
return nil, err
}
unfiltered = unfiltered[:0]
for _, receipt := range receipts {
unfiltered = append(unfiltered, receipt.Logs...)
}
logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
}
logs := filterLogs(cached.logs, nil, nil, f.addresses, f.topics)
if len(logs) == 0 {
return nil, nil
}
// Most backends will deliver un-derived logs, but check nevertheless.
if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) {
return logs, nil
}
return nil, nil

body, err := f.sys.cachedGetBody(ctx, cached, hash, header.Number.Uint64())
if err != nil {
return nil, err
}
for i, log := range logs {
// Copy log not to modify cache elements
logcopy := *log
logcopy.TxHash = body.Transactions[logcopy.TxIndex].Hash()
logs[i] = &logcopy
}
return logs, nil
}

// pendingLogs returns the logs matching the filter criteria within the pending block.
Expand Down Expand Up @@ -380,11 +380,3 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
}
return true
}

func flatten(list [][]*types.Log) []*types.Log {
var flat []*types.Log
for _, logs := range list {
flat = append(flat, logs...)
}
return flat
}
117 changes: 77 additions & 40 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -58,6 +59,7 @@ type Backend interface {
ChainDb() ethdb.Database
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
PendingBlockAndReceipts() (*types.Block, types.Receipts)
Expand All @@ -77,7 +79,7 @@ type Backend interface {
// FilterSystem holds resources shared by all filters.
type FilterSystem struct {
backend Backend
logsCache *lru.Cache[common.Hash, [][]*types.Log]
logsCache *lru.Cache[common.Hash, *logCacheElem]
cfg *Config
}

Expand All @@ -86,13 +88,18 @@ func NewFilterSystem(backend Backend, config Config) *FilterSystem {
config = config.withDefaults()
return &FilterSystem{
backend: backend,
logsCache: lru.NewCache[common.Hash, [][]*types.Log](config.LogCacheSize),
logsCache: lru.NewCache[common.Hash, *logCacheElem](config.LogCacheSize),
cfg: &config,
}
}

// cachedGetLogs loads block logs from the backend and caches the result.
func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) {
type logCacheElem struct {
logs []*types.Log
body atomic.Pointer[types.Body]
}

// cachedLogElem loads block logs from the backend and caches the result.
func (sys *FilterSystem) cachedLogElem(ctx context.Context, blockHash common.Hash, number uint64) (*logCacheElem, error) {
cached, ok := sys.logsCache.Get(blockHash)
if ok {
return cached, nil
Expand All @@ -105,8 +112,35 @@ func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Has
if logs == nil {
return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString())
}
sys.logsCache.Add(blockHash, logs)
return logs, nil
// Database logs are un-derived.
// Fill in whatever we can (txHash is inaccessible at this point).
flattened := make([]*types.Log, 0)
var logIdx uint
for i, txLogs := range logs {
for _, log := range txLogs {
log.BlockHash = blockHash
log.BlockNumber = number
log.TxIndex = uint(i)
log.Index = logIdx
logIdx++
flattened = append(flattened, log)
}
}
elem := &logCacheElem{logs: flattened}
sys.logsCache.Add(blockHash, elem)
return elem, nil
}

func (sys *FilterSystem) cachedGetBody(ctx context.Context, elem *logCacheElem, hash common.Hash, number uint64) (*types.Body, error) {
if body := elem.body.Load(); body != nil {
return body, nil
}
body, err := sys.backend.GetBody(ctx, hash, rpc.BlockNumber(number))
if err != nil {
return nil, err
}
elem.body.Store(body)
return body, nil
}

// Type determines the kind of filter and is used to put the filter in to
Expand Down Expand Up @@ -431,6 +465,12 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if f.logsCrit.FromBlock != nil && header.Number.Cmp(f.logsCrit.FromBlock) < 0 {
continue
}
if f.logsCrit.ToBlock != nil && header.Number.Cmp(f.logsCrit.ToBlock) > 0 {
continue
}
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
Expand Down Expand Up @@ -474,42 +514,39 @@ func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func

// filter logs of a single header in light client mode
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
if bloomFilter(header.Bloom, addresses, topics) {
// Get the logs of the block
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
var unfiltered []*types.Log
for _, logs := range logsList {
for _, log := range logs {
logcopy := *log
logcopy.Removed = remove
unfiltered = append(unfiltered, &logcopy)
}
}
logs := filterLogs(unfiltered, nil, nil, addresses, topics)
if len(logs) > 0 && logs[0].TxHash == (common.Hash{}) {
// We have matching but non-derived logs
receipts, err := es.backend.GetReceipts(ctx, header.Hash())
if err != nil {
return nil
}
unfiltered = unfiltered[:0]
for _, receipt := range receipts {
for _, log := range receipt.Logs {
logcopy := *log
logcopy.Removed = remove
unfiltered = append(unfiltered, &logcopy)
}
}
logs = filterLogs(unfiltered, nil, nil, addresses, topics)
}
if !bloomFilter(header.Bloom, addresses, topics) {
return nil
}
// Get the logs of the block
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
cached, err := es.sys.cachedLogElem(ctx, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
unfiltered := append([]*types.Log{}, cached.logs...)
for i, log := range unfiltered {
// Don't modify in-cache elements
logcopy := *log
logcopy.Removed = remove
// Swap copy in-place
unfiltered[i] = &logcopy
}
logs := filterLogs(unfiltered, nil, nil, addresses, topics)
// Txhash is already resolved
if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) {
return logs
}
return nil
// Resolve txhash
body, err := es.sys.cachedGetBody(ctx, cached, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
for _, log := range logs {
// logs are already copied, safe to modify
log.TxHash = body.Transactions[log.TxIndex].Hash()
}
return logs
}

// eventLoop (un)installs filters and processes mux events.
Expand Down
Loading

0 comments on commit 2def62b

Please sign in to comment.