Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Handle errors returned from Wait() if context is cancelled/exceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
fabioberger committed Nov 1, 2019
1 parent 7f694a2 commit 81a6d7c
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 11 deletions.
28 changes: 20 additions & 8 deletions ethereum/blockwatch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type RpcClient struct {
rpcClient *rpc.Client
client *ethclient.Client
requestTimeout time.Duration
rateLimiter *ratelimit.RateLimiter
rateLimiter *ratelimit.RateLimiter
}

// NewRpcClient returns a new Client for fetching Ethereum blocks using the given
Expand All @@ -45,10 +45,10 @@ func NewRpcClient(rpcURL string, requestTimeout time.Duration, rateLimiter *rate
return nil, err
}
return &RpcClient{
rpcClient: rpcClient,
client: ethClient,
rpcClient: rpcClient,
client: ethClient,
requestTimeout: requestTimeout,
rateLimiter: rateLimiter,
rateLimiter: rateLimiter,
}, nil
}

Expand All @@ -61,7 +61,11 @@ type GetBlockByNumberResponse struct {
// HeaderByNumber fetches a block header by its number. If no `number` is supplied, it will return the latest
// block header. If no block exists with this number it will return a `ethereum.NotFound` error.
func (rc *RpcClient) HeaderByNumber(number *big.Int) (*miniheader.MiniHeader, error) {
rc.rateLimiter.Wait(context.Background())
err := rc.rateLimiter.Wait(context.Background())
if err != nil {
// Context cancelled or deadline exceeded
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), rc.requestTimeout)
defer cancel()
Expand All @@ -80,7 +84,7 @@ func (rc *RpcClient) HeaderByNumber(number *big.Int) (*miniheader.MiniHeader, er
// RPC response rather than re-compute it from the block header.
// Source: https://github.com/ethereum/go-ethereum/pull/18166
var header GetBlockByNumberResponse
err := rc.rpcClient.CallContext(ctx, &header, "eth_getBlockByNumber", blockParam, shouldIncludeTransactions)
err = rc.rpcClient.CallContext(ctx, &header, "eth_getBlockByNumber", blockParam, shouldIncludeTransactions)
if err != nil {
return nil, err
}
Expand All @@ -104,7 +108,11 @@ func (rc *RpcClient) HeaderByNumber(number *big.Int) (*miniheader.MiniHeader, er
// HeaderByHash fetches a block header by its block hash. If no block exists with this number it will return
// a `ethereum.NotFound` error.
func (rc *RpcClient) HeaderByHash(hash common.Hash) (*miniheader.MiniHeader, error) {
rc.rateLimiter.Wait(context.Background())
err := rc.rateLimiter.Wait(context.Background())
if err != nil {
// Context cancelled or deadline exceeded
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), rc.requestTimeout)
defer cancel()
Expand All @@ -122,7 +130,11 @@ func (rc *RpcClient) HeaderByHash(hash common.Hash) (*miniheader.MiniHeader, err

// FilterLogs returns the logs that satisfy the supplied filter query.
func (rc *RpcClient) FilterLogs(q ethereum.FilterQuery) ([]types.Log, error) {
rc.rateLimiter.Wait(context.Background())
err := rc.rateLimiter.Wait(context.Background())
if err != nil {
// Context cancelled or deadline exceeded
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), rc.requestTimeout)
defer cancel()
Expand Down
6 changes: 5 additions & 1 deletion ethereum/eth_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,11 @@ func (e *ETHWatcher) getBalances(addresses []common.Address) (map[common.Address
go func(chunk []common.Address) {
defer wg.Done()

e.rateLimiter.Wait(context.Background())
err := e.rateLimiter.Wait(context.Background())
if err != nil {
// Context cancelled or deadline exceeded
return // Give up
}

// Pass a context with a 20 second timeout to `GetEthBalances` in order to avoid
// any one request from taking longer then 20 seconds and as a consequence, hold
Expand Down
12 changes: 10 additions & 2 deletions zeroex/ordervalidator/order_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,11 @@ func (o *OrderValidator) BatchValidate(rawSignedOrders []*zeroex.SignedOrder, ar
}

for {
o.rateLimiter.Wait(context.Background())
err := o.rateLimiter.Wait(context.Background())
if err != nil {
// Context cancelled or deadline exceeded
return // Give up
}

// Pass a context with a 15 second timeout to `GetOrderRelevantStates` in order to avoid
// any one request from taking longer then 15 seconds
Expand Down Expand Up @@ -473,7 +477,11 @@ func (o *OrderValidator) batchValidateSoftCancelled(signedOrders []*zeroex.Signe
}
endpoint, ok := o.cachedFeeRecipientToEndpoint[signedOrder.FeeRecipientAddress]
if !ok {
o.rateLimiter.Wait(context.Background())
err = o.rateLimiter.Wait(context.Background())
if err != nil {
// Context cancelled or deadline exceeded
continue // Give up
}

ctx, cancel := context.WithTimeout(context.Background(), getCoordinatorEndpointTimeout)
defer cancel()
Expand Down

0 comments on commit 81a6d7c

Please sign in to comment.