Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Light refactors of the blockwatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
jalextowle committed Jul 14, 2020
1 parent 9b26aba commit 63caae9
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 85 deletions.
5 changes: 1 addition & 4 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,7 @@ func newWithPrivateConfig(ctx context.Context, config Config, pConfig privateCon
}

// Initialize block watcher (but don't start it yet).
blockWatcherClient, err := blockwatch.NewRpcClient(ethClient)
if err != nil {
return nil, err
}
blockWatcherClient := blockwatch.NewRpcClient(ethClient)

topics := orderwatch.GetRelevantTopics()
blockWatcherConfig := blockwatch.Config{
Expand Down
42 changes: 21 additions & 21 deletions ethereum/blockwatch/block_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (w *Watcher) FastSyncToLatestBlock(ctx context.Context) (blocksElapsed int,
return 0, nil
}

latestBlock, err := w.client.HeaderByNumber(nil)
latestBlock, err := w.client.HeaderByNumber(ctx, nil)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func (w *Watcher) Watch(ctx context.Context) error {

// Sync immediately when `Watch()` is called instead of waiting for the
// first Ticker tick
if err := w.SyncToLatestBlock(); err != nil {
if err := w.SyncToLatestBlock(ctx); err != nil {
if err == db.ErrClosed {
// We can't continue if the database is closed. Stop the watcher and
// return an error.
Expand All @@ -201,7 +201,7 @@ func (w *Watcher) Watch(ctx context.Context) error {
ticker.Stop()
return nil
case <-ticker.C:
if err := w.SyncToLatestBlock(); err != nil {
if err := w.SyncToLatestBlock(ctx); err != nil {
if err == db.ErrClosed {
// We can't continue if the database is closed. Stop the watcher and
// return an error.
Expand Down Expand Up @@ -239,7 +239,7 @@ func (w *Watcher) Subscribe(sink chan<- []*Event) event.Subscription {

// SyncToLatestBlock syncs our local state of the chain to the latest block found via
// Ethereum RPC
func (w *Watcher) SyncToLatestBlock() error {
func (w *Watcher) SyncToLatestBlock(ctx context.Context) error {
w.syncToLatestBlockMu.Lock()
defer w.syncToLatestBlockMu.Unlock()

Expand All @@ -248,7 +248,7 @@ func (w *Watcher) SyncToLatestBlock() error {
return err
}

latestHeader, err := w.client.HeaderByNumber(nil)
latestHeader, err := w.client.HeaderByNumber(ctx, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -297,15 +297,15 @@ func (w *Watcher) SyncToLatestBlock() error {
break
}
nextBlockNumber := big.NewInt(0).Add(lastStoredHeader.Number, big.NewInt(1))
nextHeader, err = w.client.HeaderByNumber(nextBlockNumber)
nextHeader, err = w.client.HeaderByNumber(ctx, nextBlockNumber)
if err != nil {
syncErr = err
break
}
}

var events []*Event
events, err = w.buildCanonicalChain(nextHeader, events)
events, err = w.buildCanonicalChain(ctx, nextHeader, events)
allEvents = append(allEvents, events...)
if err != nil {
syncErr = err
Expand Down Expand Up @@ -345,14 +345,14 @@ func (w *Watcher) shouldRevertChanges(lastStoredHeader *types.MiniHeader, events
return newLatestHeader.Number.Cmp(lastStoredHeader.Number) <= 0
}

func (w *Watcher) buildCanonicalChain(nextHeader *types.MiniHeader, events []*Event) ([]*Event, error) {
func (w *Watcher) buildCanonicalChain(ctx context.Context, nextHeader *types.MiniHeader, events []*Event) ([]*Event, error) {
latestHeader, err := w.stack.Peek()
if err != nil {
return nil, err
}
// Is the stack empty or is it the next block?
if latestHeader == nil || nextHeader.Parent == latestHeader.Hash {
nextHeader, err := w.addLogs(nextHeader)
nextHeader, err := w.addLogs(ctx, nextHeader)
if err != nil {
return events, err
}
Expand All @@ -376,15 +376,15 @@ func (w *Watcher) buildCanonicalChain(nextHeader *types.MiniHeader, events []*Ev
BlockHeader: latestHeader,
})

nextParentHeader, err := w.client.HeaderByHash(nextHeader.Parent)
nextParentHeader, err := w.client.HeaderByHash(ctx, nextHeader.Parent)
if err != nil {
return events, err
}
events, err = w.buildCanonicalChain(nextParentHeader, events)
events, err = w.buildCanonicalChain(ctx, nextParentHeader, events)
if err != nil {
return events, err
}
nextHeader, err = w.addLogs(nextHeader)
nextHeader, err = w.addLogs(ctx, nextHeader)
if err != nil {
return events, err
}
Expand All @@ -400,11 +400,11 @@ func (w *Watcher) buildCanonicalChain(nextHeader *types.MiniHeader, events []*Ev
return events, nil
}

func (w *Watcher) addLogs(header *types.MiniHeader) (*types.MiniHeader, error) {
func (w *Watcher) addLogs(ctx context.Context, header *types.MiniHeader) (*types.MiniHeader, error) {
if !w.withLogs {
return header, nil
}
logs, err := w.client.FilterLogs(ethereum.FilterQuery{
logs, err := w.client.FilterLogs(ctx, ethereum.FilterQuery{
BlockHash: &header.Hash,
Topics: [][]common.Hash{w.topics},
})
Expand Down Expand Up @@ -433,7 +433,7 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context, blocksElapsed i
return events, err
}
// Add furthest block processed into the DB
latestHeader, err := w.client.HeaderByNumber(big.NewInt(int64(furthestBlockProcessed)))
latestHeader, err := w.client.HeaderByNumber(ctx, big.NewInt(int64(furthestBlockProcessed)))
if err != nil {
return events, err
}
Expand All @@ -454,7 +454,7 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context, blocksElapsed i
blockHeader, found := hashToBlockHeader[log.BlockHash]
if !found {
blockNumber := big.NewInt(0).SetUint64(log.BlockNumber)
header, err := w.client.HeaderByNumber(blockNumber)
header, err := w.client.HeaderByNumber(ctx, blockNumber)
if err != nil {
return events, err
}
Expand Down Expand Up @@ -554,7 +554,7 @@ func (w *Watcher) getLogsInBlockRange(ctx context.Context, from, to int) ([]etht
default:
}

logs, err := w.filterLogsRecurisively(b.FromBlock, b.ToBlock, []ethtypes.Log{})
logs, err := w.filterLogsRecursively(ctx, b.FromBlock, b.ToBlock, []ethtypes.Log{})
if err != nil {
log.WithFields(map[string]interface{}{
"error": err,
Expand Down Expand Up @@ -635,7 +635,7 @@ func (w *Watcher) getSubBlockRanges(from, to, rangeSize int) []*blockRange {

const infuraTooManyResultsErrMsg = "query returned more than 10000 results"

func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []ethtypes.Log) ([]ethtypes.Log, error) {
func (w *Watcher) filterLogsRecursively(ctx context.Context, from, to int, allLogs []ethtypes.Log) ([]ethtypes.Log, error) {
log.WithFields(map[string]interface{}{
"from": from,
"to": to,
Expand All @@ -645,7 +645,7 @@ func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []ethtypes.Log) (
if len(w.topics) > 0 {
topics = append(topics, w.topics)
}
logs, err := w.client.FilterLogs(ethereum.FilterQuery{
logs, err := w.client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: big.NewInt(int64(from)),
ToBlock: big.NewInt(int64(to)),
Topics: topics,
Expand All @@ -672,11 +672,11 @@ func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []ethtypes.Log) (

endFirstHalf := from + firstBatchSize
startSecondHalf := endFirstHalf + 1
allLogs, err := w.filterLogsRecurisively(from, endFirstHalf, allLogs)
allLogs, err := w.filterLogsRecursively(ctx, from, endFirstHalf, allLogs)
if err != nil {
return nil, err
}
allLogs, err = w.filterLogsRecurisively(startSecondHalf, to, allLogs)
allLogs, err = w.filterLogsRecursively(ctx, startSecondHalf, to, allLogs)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions ethereum/blockwatch/block_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestWatcher(t *testing.T) {
for i := 0; i < fakeClient.NumberOfTimesteps(); i++ {
scenarioLabel := fakeClient.GetScenarioLabel()

err := watcher.SyncToLatestBlock()
err := watcher.SyncToLatestBlock(ctx)
if strings.HasPrefix(scenarioLabel, "ERROR") {
require.Error(t, err)
} else {
Expand Down Expand Up @@ -390,7 +390,7 @@ func TestFilterLogsRecursively(t *testing.T) {
require.NoError(t, err)
watcher := setupOrderWatcher(t, ctx, fakeLogClient)

logs, err := watcher.filterLogsRecurisively(from, to, []ethtypes.Log{})
logs, err := watcher.filterLogsRecursively(ctx, from, to, []ethtypes.Log{})
require.Equal(t, testCase.Err, err, testCase.Label)
require.Equal(t, testCase.Logs, logs, testCase.Label)
assert.Equal(t, len(testCase.rangeToFilterLogsResponse), fakeLogClient.Count())
Expand Down
68 changes: 37 additions & 31 deletions ethereum/blockwatch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package blockwatch

import (
"context"
"errors"
"fmt"
"math/big"
"time"
Expand All @@ -17,30 +16,34 @@ import (
ethtypes "github.com/ethereum/go-ethereum/core/types"
)

var (
const (
// We give up on ETH RPC requests sent for the purpose of block watching after 10 seconds
requestTimeout = 10 * time.Second
requestTimeout = 10 * time.Second
bigIntParsingErrorString = "Failed to parse big.Int value from hex-encoded %s returned from %s"
)

// Client defines the methods needed to satisfy the client expected when
// instantiating a Watcher instance.
type Client interface {
HeaderByNumber(number *big.Int) (*types.MiniHeader, error)
HeaderByHash(hash common.Hash) (*types.MiniHeader, error)
FilterLogs(q ethereum.FilterQuery) ([]ethtypes.Log, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.MiniHeader, error)
HeaderByHash(ctx context.Context, hash common.Hash) (*types.MiniHeader, error)
FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]ethtypes.Log, error)
}

// Ensure that RpcClient is compliant with the Client interface
var _ Client = &RpcClient{}

// RpcClient is a Client for fetching Ethereum blocks from a specific JSON-RPC endpoint.
type RpcClient struct {
ethRPCClient ethrpcclient.Client
}

// NewRpcClient returns a new Client for fetching Ethereum blocks using the given
// ethclient.Client.
func NewRpcClient(ethRPCClient ethrpcclient.Client) (*RpcClient, error) {
func NewRpcClient(ethRPCClient ethrpcclient.Client) *RpcClient {
return &RpcClient{
ethRPCClient: ethRPCClient,
}, nil
}
}

type GetBlockByNumberResponse struct {
Expand All @@ -61,9 +64,10 @@ func (e UnknownBlockNumberError) Error() string {
return fmt.Sprintf("%s: %s", e.Message, e.BlockNumber)
}

// HeaderByNumber fetches a block header by its number. If no `number` is supplied, it will return the latest
// block header. If no block exists with this number it will return a `ethereum.NotFound` error.
func (rc *RpcClient) HeaderByNumber(number *big.Int) (*types.MiniHeader, error) {
// HeaderByNumber fetches a block header by its number. If no `number` is supplied,
// it will return the latest block header. If no block exists with this number it
// will return a `ethereum.NotFound` error.
func (rc *RpcClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.MiniHeader, error) {
var blockParam string
if number == nil {
blockParam = "latest"
Expand All @@ -72,21 +76,23 @@ func (rc *RpcClient) HeaderByNumber(number *big.Int) (*types.MiniHeader, error)
}
shouldIncludeTransactions := false

// Note(fabio): We use a raw RPC call here instead of `EthClient`'s `BlockByNumber()` method because block
// hashes are computed differently on Kovan vs. mainnet, resulting in the wrong block hash being returned by
// `BlockByNumber` when using Kovan. By doing a raw RPC call, we can simply use the blockHash returned in the
// RPC response rather than re-compute it from the block header.
// Note(fabio): We use a raw RPC call here instead of `EthClient`'s
// `BlockByNumber()` method because block hashes are computed differently
// on Kovan vs. mainnet, resulting in the wrong block hash being returned
// by `BlockByNumber` when using Kovan. By doing a raw RPC call, we can
// simply use the blockHash returned in the RPC response rather than
// re-compute it from the block header.
// Source: https://github.com/ethereum/go-ethereum/pull/18166
var header GetBlockByNumberResponse
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
err := rc.ethRPCClient.CallContext(ctx, &header, "eth_getBlockByNumber", blockParam, shouldIncludeTransactions)
if err != nil {
return nil, err
}
// If it returned an empty struct
if header.Number == "" {
// Add blockHash to error so it gets logged
// Add block number to error so it gets logged
return nil, UnknownBlockNumberError{
Message: ethereum.NotFound.Error(),
BlockNumber: number,
Expand All @@ -95,23 +101,23 @@ func (rc *RpcClient) HeaderByNumber(number *big.Int) (*types.MiniHeader, error)

blockNum, ok := math.ParseBig256(header.Number)
if !ok {
return nil, errors.New("Failed to parse big.Int value from hex-encoded block number returned from eth_getBlockByNumber")
return nil, fmt.Errorf(bigIntParsingErrorString, "block timestamp", "eth_getBlockByNumber")
}
unixTimestamp, ok := math.ParseBig256(header.Timestamp)
blockTimestamp, ok := math.ParseBig256(header.Timestamp)
if !ok {
return nil, errors.New("Failed to parse big.Int value from hex-encoded block timestamp returned from eth_getBlockByNumber")
return nil, fmt.Errorf(bigIntParsingErrorString, "block timestamp", "eth_getBlockByNumber")
}
miniHeader := &types.MiniHeader{
Hash: header.Hash,
Parent: header.ParentHash,
Number: blockNum,
Timestamp: time.Unix(unixTimestamp.Int64(), 0),
Timestamp: time.Unix(blockTimestamp.Int64(), 0),
}
return miniHeader, nil
}

// UnknownBlockHashError is the error returned from a filter logs RPC call when the blockHash
// specified is not recognized
// UnknownBlockHashError is the error returned from a filter logs RPC call when
// the blockHash specified is not recognized
type UnknownBlockHashError struct {
BlockHash common.Hash
}
Expand All @@ -120,10 +126,10 @@ func (e UnknownBlockHashError) Error() string {
return fmt.Sprintf("%s: %s", ethereum.NotFound.Error(), e.BlockHash)
}

// HeaderByHash fetches a block header by its block hash. If no block exists with this number it will return
// a `ethereum.NotFound` error.
func (rc *RpcClient) HeaderByHash(hash common.Hash) (*types.MiniHeader, error) {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
// HeaderByHash fetches a block header by its block hash. If no block exists with
// this hash it will return a `ethereum.NotFound` error.
func (rc *RpcClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.MiniHeader, error) {
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
header, err := rc.ethRPCClient.HeaderByHash(ctx, hash)
if err != nil {
Expand All @@ -144,8 +150,8 @@ func (rc *RpcClient) HeaderByHash(hash common.Hash) (*types.MiniHeader, error) {
return miniHeader, nil
}

// FilterUnknownBlockError is the error returned from a filter logs RPC call when the blockHash
// specified is not recognized
// FilterUnknownBlockError is the error returned from a filter logs RPC call when
// the blockHash specified is not recognized
type FilterUnknownBlockError struct {
Message string
FilterQuery ethereum.FilterQuery
Expand All @@ -156,8 +162,8 @@ func (e FilterUnknownBlockError) Error() string {
}

// FilterLogs returns the logs that satisfy the supplied filter query.
func (rc *RpcClient) FilterLogs(q ethereum.FilterQuery) ([]ethtypes.Log, error) {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
func (rc *RpcClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]ethtypes.Log, error) {
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
logs, err := rc.ethRPCClient.FilterLogs(ctx, q)
if err != nil {
Expand Down
Loading

0 comments on commit 63caae9

Please sign in to comment.