Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve package eth/filter #491

Merged
merged 7 commits into from
Mar 18, 2024
32 changes: 19 additions & 13 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,18 +415,24 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
//
// TODO(karalabe): Deprecate when the subscription one can return past data too.
func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.FilterQuery) ([]types.Log, error) {
// Initialize unset filter boundaried to run from genesis to chain head
from := int64(0)
if query.FromBlock != nil {
from = query.FromBlock.Int64()
}
to := int64(-1)
if query.ToBlock != nil {
to = query.ToBlock.Int64()
var filter *filters.Filter
if query.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain}, *query.BlockHash, query.Addresses, query.Topics)
} else {
// Initialize unset filter boundaried to run from genesis to chain head
from := int64(0)
if query.FromBlock != nil {
from = query.FromBlock.Int64()
}
to := int64(-1)
if query.ToBlock != nil {
to = query.ToBlock.Int64()
}
// Construct the range filter
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics)
}
// Construct and execute the filter
filter := filters.New(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics)

// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -527,8 +533,8 @@ func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumb
return fb.bc.GetHeaderByNumber(uint64(block.Int64())), nil
}

func (fb *filterBackend) HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) {
return fb.bc.GetHeaderByHash(blockHash), nil
func (fb *filterBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
return fb.bc.GetHeaderByHash(hash), nil
}

func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) {
Expand Down
70 changes: 31 additions & 39 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,39 +332,24 @@ func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([
return nil, errExceedMaxTopics
}

var (
fromBlock int64
toBlock int64
)

var filter *Filter
if crit.BlockHash != nil {
// look up block number from block hash
header, err := api.backend.HeaderByHash(ctx, *crit.BlockHash)
if err != nil {
return nil, err
}
if header == nil {
return nil, errors.New("unknown block")
}
fromBlock = int64(header.Number.Int64())
toBlock = fromBlock
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
} else {
// Convert the RPC block numbers into internal representations
if crit.FromBlock == nil {
fromBlock = int64(rpc.LatestBlockNumber)
} else {
fromBlock = crit.FromBlock.Int64()
begin := rpc.LatestBlockNumber.Int64()
if crit.FromBlock != nil {
begin = crit.FromBlock.Int64()
}
if crit.ToBlock == nil {
toBlock = int64(rpc.LatestBlockNumber)
} else {
toBlock = crit.ToBlock.Int64()
end := rpc.LatestBlockNumber.Int64()
if crit.ToBlock != nil {
end = crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
}

// Create and run the filter to get all the logs
filter := New(api.backend, fromBlock, toBlock, crit.Addresses, crit.Topics)

// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -402,17 +387,24 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
return nil, fmt.Errorf("filter not found")
}

begin := rpc.LatestBlockNumber.Int64()
if f.crit.FromBlock != nil {
begin = f.crit.FromBlock.Int64()
}
end := rpc.LatestBlockNumber.Int64()
if f.crit.ToBlock != nil {
end = f.crit.ToBlock.Int64()
var filter *Filter
if f.crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
if f.crit.FromBlock != nil {
begin = f.crit.FromBlock.Int64()
}
end := rpc.LatestBlockNumber.Int64()
if f.crit.ToBlock != nil {
end = f.crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
}
// Create and run the filter to get all the logs
filter := New(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)

// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -587,15 +579,15 @@ func decodeAddress(s string) (common.Address, error) {
}
b, err := hexutil.Decode(s)
if err == nil && len(b) != common.AddressLength {
err = fmt.Errorf("hex has invalid length %d after decoding", len(b))
err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength)
}
return common.BytesToAddress(b), err
}

func decodeTopic(s string) (common.Hash, error) {
b, err := hexutil.Decode(s)
if err == nil && len(b) != common.HashLength {
err = fmt.Errorf("hex has invalid length %d after decoding", len(b))
err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength)
}
return common.BytesToHash(b), err
}
32 changes: 16 additions & 16 deletions eth/filters/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
"bytes"
"context"
"fmt"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"testing"
"time"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/bitutil"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/bloombits"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/event"
Expand Down Expand Up @@ -66,7 +66,7 @@ const benchFilterCnt = 2000

func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
benchDataDir := node.DefaultDataDir() + "/geth/chaindata"
fmt.Println("Running bloombits benchmark section size:", sectionSize)
b.Log("Running bloombits benchmark section size:", sectionSize)

db, err := rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
if err != nil {
Expand All @@ -78,7 +78,7 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
}

clearBloomBits(db)
fmt.Println("Generating bloombits data...")
b.Log("Generating bloombits data...")
headNum := core.GetBlockNumber(db, head)
if headNum < sectionSize+512 {
b.Fatalf("not enough blocks for running a benchmark")
Expand Down Expand Up @@ -113,16 +113,16 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
core.WriteBloomBits(db, uint(i), sectionIdx, sectionHead, comp)
}
//if sectionIdx%50 == 0 {
// fmt.Println(" section", sectionIdx, "/", cnt)
// b.Log(" section", sectionIdx, "/", cnt)
//}
}

d := time.Since(start)
fmt.Println("Finished generating bloombits data")
fmt.Println(" ", d, "total ", d/time.Duration(cnt*sectionSize), "per block")
fmt.Println(" data size:", dataSize, " compressed size:", compSize, " compression ratio:", float64(compSize)/float64(dataSize))
b.Log("Finished generating bloombits data")
b.Log(" ", d, "total ", d/time.Duration(cnt*sectionSize), "per block")
b.Log(" data size:", dataSize, " compressed size:", compSize, " compression ratio:", float64(compSize)/float64(dataSize))

fmt.Println("Running filter benchmarks...")
b.Log("Running filter benchmarks...")
start = time.Now()
mux := new(event.TypeMux)
var backend *testBackend
Expand All @@ -136,14 +136,14 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
var addr common.Address
addr[0] = byte(i)
addr[1] = byte(i / 256)
filter := New(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
if _, err := filter.Logs(context.Background()); err != nil {
b.Error("filter.Find error:", err)
}
}
d = time.Since(start)
fmt.Println("Finished running filter benchmarks")
fmt.Println(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks")
b.Log("Finished running filter benchmarks")
b.Log(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks")
db.Close()
}

Expand Down Expand Up @@ -175,7 +175,7 @@ func clearBloomBits(db ethdb.Database) {

func BenchmarkNoBloomBits(b *testing.B) {
benchDataDir := node.DefaultDataDir() + "/geth/chaindata"
fmt.Println("Running benchmark without bloombits")
b.Log("Running benchmark without bloombits")
db, err := rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
if err != nil {
b.Fatalf("error opening database at %v: %v", benchDataDir, err)
Expand All @@ -188,14 +188,14 @@ func BenchmarkNoBloomBits(b *testing.B) {

clearBloomBits(db)

fmt.Println("Running filter benchmarks...")
b.Log("Running filter benchmarks...")
start := time.Now()
mux := new(event.TypeMux)
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
filter := New(backend, 0, int64(headNum), []common.Address{{}}, nil)
filter := NewRangeFilter(backend, 0, int64(headNum), []common.Address{{}}, nil)
filter.Logs(context.Background())
d := time.Since(start)
fmt.Println("Finished running filter benchmarks")
fmt.Println(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks")
b.Log("Finished running filter benchmarks")
b.Log(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks")
db.Close()
}
82 changes: 62 additions & 20 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package filters

import (
"context"
"errors"
"math/big"

"github.com/XinFinOrg/XDPoSChain/common"
Expand Down Expand Up @@ -50,17 +51,19 @@ type Backend interface {
type Filter struct {
backend Backend

db ethdb.Database
begin, end int64
addresses []common.Address
topics [][]common.Hash
db ethdb.Database
addresses []common.Address
topics [][]common.Hash

block common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks

matcher *bloombits.Matcher
}

// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
// figure out whether a particular block is interesting or not.
func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
// Flatten the address and topic filter clauses into a single bloombits filter
// system. Since the bloombits are not positional, nil topics are permitted,
// which get flattened into a nil byte slice.
Expand All @@ -79,23 +82,52 @@ func New(backend Backend, begin, end int64, addresses []common.Address, topics [
}
filters = append(filters, filter)
}
// Assemble and return the filter
size, _ := backend.BloomStatus()

// Create a generic filter and convert it into a range filter
filter := newFilter(backend, addresses, topics)

filter.matcher = bloombits.NewMatcher(size, filters)
filter.begin = begin
filter.end = end

return filter
}

// NewBlockFilter creates a new filter which directly inspects the contents of
// a block to figure out whether it is interesting or not.
func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
// Create a generic filter and convert it into a block filter
filter := newFilter(backend, addresses, topics)
filter.block = block
return filter
}

// newFilter creates a generic filter that can either filter based on a block hash,
// or based on range queries. The search criteria needs to be explicitly set.
func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter {
return &Filter{
backend: backend,
begin: begin,
end: end,
addresses: addresses,
topics: topics,
db: backend.ChainDb(),
matcher: bloombits.NewMatcher(size, filters),
}
}

// Logs searches the blockchain for matching log entries, returning all from the
// first block that contains matches, updating the start of the filter accordingly.
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
// If we're doing singleton block filtering, execute and return
if f.block != (common.Hash{}) {
header, err := f.backend.HeaderByHash(ctx, f.block)
if err != nil {
return nil, err
}
if header == nil {
return nil, errors.New("unknown block")
}
return f.blockLogs(ctx, header)
}
// Figure out the limits of the filter range
header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if header == nil {
Expand Down Expand Up @@ -188,13 +220,23 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
if header == nil || err != nil {
return logs, err
}
if bloomFilter(header.Bloom, f.addresses, f.topics) {
found, err := f.checkMatches(ctx, header)
if err != nil {
return logs, err
}
logs = append(logs, found...)
found, err := f.blockLogs(ctx, header)
if err != nil {
return logs, err
}
logs = append(logs, found...)
}
return logs, nil
}

// blockLogs returns the logs matching the filter criteria within a single block.
func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
if bloomFilter(header.Bloom, f.addresses, f.topics) {
found, err := f.checkMatches(ctx, header)
if err != nil {
return logs, err
}
logs = append(logs, found...)
}
return logs, nil
}
Expand Down Expand Up @@ -259,9 +301,9 @@ Logs:
if len(topics) > len(log.Topics) {
continue Logs
}
for i, topics := range topics {
match := len(topics) == 0 // empty rule set == wildcard
for _, topic := range topics {
for i, sub := range topics {
match := len(sub) == 0 // empty rule set == wildcard
for _, topic := range sub {
if log.Topics[i] == topic {
match = true
break
Expand Down
Loading