Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BCF-3269] - Contract Reader Batch Call #13635

Merged
merged 17 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/pink-papayas-swim.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal Add BatchGetLatestValues to ChainReader
30 changes: 30 additions & 0 deletions core/capabilities/targets/mocks/chain_reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240710165532-ade916a95858
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240712101200-5b11e6cc6e86
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240702141926-063ceef8c42e
github.com/spf13/cobra v1.8.0
Expand Down Expand Up @@ -273,7 +273,7 @@ require (
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702145022-37a2c3a742d1 // indirect
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240710170203-5b41615da827 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240710170818-eccca28888e5 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240712132946-267a37c5ac6e // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240709043547-03612098f799 // indirect
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 // indirect
github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20230906073235-9e478e5e19f1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1178,16 +1178,16 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8=
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240710165532-ade916a95858 h1:nwAe0iA4JN7/oEFz/N2lkTpNh6rxlzbK7g8Els/dDew=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240710165532-ade916a95858/go.mod h1:fh9eBbrReCmv31bfz52ENCAMa7nTKQbdhb2B3+S2VGo=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240712101200-5b11e6cc6e86 h1:TYALsn6Jue7xCIcXMel+Ow0SuudVfOUAz6iups946Yw=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240712101200-5b11e6cc6e86/go.mod h1:fh9eBbrReCmv31bfz52ENCAMa7nTKQbdhb2B3+S2VGo=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702145022-37a2c3a742d1 h1:dsTmitRaVizHxoYFoGz4+y/zVa8XnvKUiTaZdx+6t9M=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702145022-37a2c3a742d1/go.mod h1:6DgCnHMGdBaIh0bLs1dK0MtdeMZfeNhc/nvBUN6KIUg=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240710170203-5b41615da827 h1:BCHu4pNP6arrcHLEWx61XjLaonOd2coQNyL0NTUcaMc=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240710170203-5b41615da827/go.mod h1:OPX+wC2TWQsyLNpR7daMt2vMpmsNcoBxbZyGTHr6tiA=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240710170818-eccca28888e5 h1:gktRCdvNp0tczyqb79JaQOloa/elDS6t33qjAS9SrEU=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240710170818-eccca28888e5/go.mod h1:aJUY4hdo1g942mhlPX9Z4FWe5ldEyWvsWSNf7frh7yU=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240712132946-267a37c5ac6e h1:PzwzlHNv1YbJ6ZIdl/pIFRoOuOS4V4WLvjZvFUnZFL4=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240712132946-267a37c5ac6e/go.mod h1:hsFhop+SlQHKD+DEFjZrMJmbauT1A/wvtZIeeo4PxFU=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240709043547-03612098f799 h1:HyLTySm7BR+oNfZqDTkVJ25wnmcTtxBBD31UkFL+kEM=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240709043547-03612098f799/go.mod h1:UVFRacRkP7O7TQAzFmR52v5mUlxf+G1ovMlCQAB/cHU=
github.com/smartcontractkit/go-plugin v0.0.0-20240208201424-b3b91517de16 h1:TFe+FvzxClblt6qRfqEhUfa4kFQx5UobuoFGO2W4mMo=
Expand Down
312 changes: 312 additions & 0 deletions core/services/relay/evm/batch_caller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
package evm

import (
"context"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

var errEmptyOutput = errors.New("rpc call output is empty (make sure that the contract method exists and rpc is healthy)")

const (
// DefaultRpcBatchSizeLimit defines the maximum number of rpc requests to be included in a batch.
DefaultRpcBatchSizeLimit = 100

// DefaultRpcBatchBackOffMultiplier defines the rate of reducing the batch size limit for retried calls.
// For example if limit is 20 and multiplier is 4:
// 1. 20
// 2. 20/4 = 5
// 3. 5/4 = 1
DefaultRpcBatchBackOffMultiplier = 5

// DefaultMaxParallelRpcCalls defines the default maximum number of individual in-parallel rpc calls.
DefaultMaxParallelRpcCalls = 10
)

// BatchResult is organised by contracts names, key is contract name.
type BatchResult map[string]ContractResults
type ContractResults []MethodCallResult
type MethodCallResult struct {
MethodName string
ReturnValue any
Err error
}

type BatchCall []Call
type Call struct {
ContractAddress common.Address
ContractName, MethodName string
Params, ReturnVal any
}

func (c BatchCall) String() string {
callString := ""
for _, call := range c {
callString += fmt.Sprintf("%s\n", call.String())
}
return callString
}

// Implement the String method for the Call struct
func (c Call) String() string {
return fmt.Sprintf("contractAddress: %s, contractName: %s, method: %s, params: %+v returnValType: %T",
c.ContractAddress.Hex(), c.ContractName, c.MethodName, c.Params, c.ReturnVal)
}

//go:generate mockery --quiet --name BatchCaller --output ./rpclibmocks --outpkg rpclibmocks --filename batch_caller.go --case=underscore
type BatchCaller interface {
// BatchCall executes all the provided BatchRequest and returns the results in the same order
// of the calls. Pass blockNumber=0 to use the latest block.
BatchCall(ctx context.Context, blockNumber uint64, batchRequests BatchCall) (BatchResult, error)
}

// dynamicLimitedBatchCaller makes batched rpc calls and perform retries by reducing the batch size on each retry.
type dynamicLimitedBatchCaller struct {
bc *defaultEvmBatchCaller
}

func NewDynamicLimitedBatchCaller(lggr logger.Logger, codec types.Codec, evmClient client.Client, batchSizeLimit, backOffMultiplier, parallelRpcCallsLimit uint) BatchCaller {
return &dynamicLimitedBatchCaller{
bc: newDefaultEvmBatchCaller(lggr, evmClient, codec, batchSizeLimit, backOffMultiplier, parallelRpcCallsLimit),
}
}

func (c *dynamicLimitedBatchCaller) BatchCall(ctx context.Context, blockNumber uint64, reqs BatchCall) (BatchResult, error) {
return c.bc.batchCallDynamicLimitRetries(ctx, blockNumber, reqs)
}

type defaultEvmBatchCaller struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batchCaller and its tests are identical to what CCIP is currently using, with some adjustments to make it work better with Chain Reader interface

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parallel batch caller is something now used by at least ChainReader, CCIP, Automation, and Atlas with various minor differences. We should make a common component to do this with various post-processing hooks for the batch results. Then we wouldn't have so much code duplication.

Copy link
Contributor Author

@ilija42 ilija42 Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the work to do this is way greater than maintaining two batchCallers since batchCaller isn't something that is frequently updated

lggr logger.Logger
evmClient client.Client
codec types.Codec
batchSizeLimit uint
parallelRpcCallsLimit uint
backOffMultiplier uint
}

// NewDefaultEvmBatchCaller returns a new batch caller instance.
// batchCallLimit defines the maximum number of calls for BatchCallLimit method, pass 0 to keep the default.
// backOffMultiplier defines the back-off strategy for retries on BatchCallDynamicLimitRetries method, pass 0 to keep the default.
func newDefaultEvmBatchCaller(
lggr logger.Logger, evmClient client.Client, codec types.Codec, batchSizeLimit, backOffMultiplier, parallelRpcCallsLimit uint,
) *defaultEvmBatchCaller {
batchSize := uint(DefaultRpcBatchSizeLimit)
if batchSizeLimit > 0 {
batchSize = batchSizeLimit
}

multiplier := uint(DefaultRpcBatchBackOffMultiplier)
if backOffMultiplier > 0 {
multiplier = backOffMultiplier
}

parallelRpcCalls := uint(DefaultMaxParallelRpcCalls)
if parallelRpcCallsLimit > 0 {
parallelRpcCalls = parallelRpcCallsLimit
}

return &defaultEvmBatchCaller{
lggr: lggr,
evmClient: evmClient,
codec: codec,
batchSizeLimit: batchSize,
parallelRpcCallsLimit: parallelRpcCalls,
backOffMultiplier: multiplier,
}
}

func (c *defaultEvmBatchCaller) batchCall(ctx context.Context, blockNumber uint64, batchCall BatchCall) ([]dataAndErr, error) {
if len(batchCall) == 0 {
return nil, nil
}

packedOutputs := make([]string, len(batchCall))
rpcBatchCalls := make([]rpc.BatchElem, len(batchCall))
for i, call := range batchCall {
data, err := c.codec.Encode(ctx, call.Params, WrapItemType(call.ContractName, call.MethodName, true))
if err != nil {
return nil, err
}

blockNumStr := "latest"
if blockNumber > 0 {
blockNumStr = hexutil.EncodeBig(big.NewInt(0).SetUint64(blockNumber))
}

rpcBatchCalls[i] = rpc.BatchElem{
Method: "eth_call",
Args: []any{
map[string]interface{}{
"from": common.Address{},
"to": call.ContractAddress,
"data": data,
},
blockNumStr,
},
Result: &packedOutputs[i],
}
}

if err := c.evmClient.BatchCallContext(ctx, rpcBatchCalls); err != nil {
return nil, fmt.Errorf("batch call context: %w", err)
}

results := make([]dataAndErr, len(batchCall))
for i, call := range batchCall {
results[i] = dataAndErr{
contractName: call.ContractName,
methodName: call.MethodName,
returnVal: call.ReturnVal,
}

if rpcBatchCalls[i].Error != nil {
results[i].err = rpcBatchCalls[i].Error
continue
}

if packedOutputs[i] == "" {
// Some RPCs instead of returning "0x" are returning an empty string.
// We are overriding this behaviour for consistent handling of this scenario.
packedOutputs[i] = "0x"
}

b, err := hexutil.Decode(packedOutputs[i])
if err != nil {
return nil, fmt.Errorf("decode result %s: packedOutputs %s: %w", call, packedOutputs[i], err)
}

if err = c.codec.Decode(ctx, b, call.ReturnVal, WrapItemType(call.ContractName, call.MethodName, false)); err != nil {
if len(b) == 0 {
results[i].err = fmt.Errorf("unpack result %s: %s: %w", call, err.Error(), errEmptyOutput)
} else {
results[i].err = fmt.Errorf("unpack result %s: %w", call, err)
}
continue
}
results[i].returnVal = call.ReturnVal
}

return results, nil
}

func (c *defaultEvmBatchCaller) batchCallDynamicLimitRetries(ctx context.Context, blockNumber uint64, calls BatchCall) (BatchResult, error) {
lim := c.batchSizeLimit
// Limit the batch size to the number of calls
if uint(len(calls)) < lim {
lim = uint(len(calls))
}
for {
results, err := c.batchCallLimit(ctx, blockNumber, calls, lim)
if err == nil {
return results, nil
}

if lim <= 1 {
return nil, errors.Wrapf(err, "calls %+v", calls)
}

newLim := lim / c.backOffMultiplier
if newLim == 0 || newLim == lim {
newLim = 1
}
lim = newLim
c.lggr.Errorf("retrying batch call with %d calls and %d limit that failed with error=%s",
len(calls), lim, err)
}
}

type dataAndErr struct {
contractName, methodName string
returnVal any
err error
}

func (c *defaultEvmBatchCaller) batchCallLimit(ctx context.Context, blockNumber uint64, calls BatchCall, batchSizeLimit uint) (BatchResult, error) {
if batchSizeLimit <= 0 {
res, err := c.batchCall(ctx, blockNumber, calls)
return convertToBatchResult(res), err
}

type job struct {
blockNumber uint64
calls BatchCall
results []dataAndErr
}

jobs := make([]job, 0)
for i := 0; i < len(calls); i += int(batchSizeLimit) {
idxFrom := i
idxTo := idxFrom + int(batchSizeLimit)
if idxTo > len(calls) {
idxTo = len(calls)
}
jobs = append(jobs, job{blockNumber: blockNumber, calls: calls[idxFrom:idxTo], results: nil})
}

if c.parallelRpcCallsLimit > 1 {
eg := new(errgroup.Group)
eg.SetLimit(int(c.parallelRpcCallsLimit))
for jobIdx := range jobs {
jobIdx := jobIdx
eg.Go(func() error {
res, err := c.batchCall(ctx, jobs[jobIdx].blockNumber, jobs[jobIdx].calls)
if err != nil {
return err
}
jobs[jobIdx].results = res
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
} else {
var err error
for jobIdx := range jobs {
jobs[jobIdx].results, err = c.batchCall(ctx, jobs[jobIdx].blockNumber, jobs[jobIdx].calls)
if err != nil {
return nil, err
}
}
}

var results []dataAndErr
for _, jb := range jobs {
results = append(results, jb.results...)
}

return convertToBatchResult(results), nil
}

func convertToBatchResult(data []dataAndErr) BatchResult {
if data == nil {
return nil
}

batchResult := make(BatchResult)
for _, d := range data {
methodCall := MethodCallResult{
MethodName: d.methodName,
ReturnValue: d.returnVal,
Err: d.err,
}

if _, exists := batchResult[d.contractName]; !exists {
batchResult[d.contractName] = ContractResults{}
}

batchResult[d.contractName] = append(batchResult[d.contractName], methodCall)
}

return batchResult
}
Loading
Loading