-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BCF-3269] - Contract Reader Batch Call #13635
Merged
Merged
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
3411a85
Chain Reader Batch Call inital commit
ilija42 92a0ed1
Add batchCaller tests
ilija42 060e1b2
Minor code improvement for ChainReader addDecoderDef and addEncoderDef
ilija42 ff9b73a
Use common types for Chain Reader BatchGetLatestValue
ilija42 f9a7801
CR BatchGetLatestValue fixes and setup evm chain reader tester
ilija42 8ce1f52
Run lint, generate and update common ref
ilija42 916ea17
Add changeset
ilija42 d1d3371
Update common ref, rename BatchGetLatestValue to BatchGetLatestValues
ilija42 4b2b3e6
Import new common changes
ilija42 fee2277
Return an error if BatchGetLatestValues received an event in the req
ilija42 a843b27
Bump common
ilija42 1cf07d5
Merge remote-tracking branch 'origin/develop' into BCF-3269-batch-get…
ilija42 c67a5eb
fix minor merge issues
ilija42 14ade15
Add common codec mock for batch_caller_test
ilija42 251bf83
Bump feeds
ilija42 d2bd281
Bump Solana
ilija42 e6de266
Merge branch 'develop' into BCF-3269-batch-getlatestvalue
ilija42 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"chainlink": minor | ||
--- | ||
|
||
#internal Add BatchGetLatestValues to ChainReader |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,312 @@ | ||
package evm | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/big" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
"github.com/ethereum/go-ethereum/common/hexutil" | ||
"github.com/ethereum/go-ethereum/rpc" | ||
"github.com/pkg/errors" | ||
"golang.org/x/sync/errgroup" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/types" | ||
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
) | ||
|
||
var errEmptyOutput = errors.New("rpc call output is empty (make sure that the contract method exists and rpc is healthy)") | ||
|
||
const ( | ||
// DefaultRpcBatchSizeLimit defines the maximum number of rpc requests to be included in a batch. | ||
DefaultRpcBatchSizeLimit = 100 | ||
|
||
// DefaultRpcBatchBackOffMultiplier defines the rate of reducing the batch size limit for retried calls. | ||
// For example if limit is 20 and multiplier is 4: | ||
// 1. 20 | ||
// 2. 20/4 = 5 | ||
// 3. 5/4 = 1 | ||
DefaultRpcBatchBackOffMultiplier = 5 | ||
|
||
// DefaultMaxParallelRpcCalls defines the default maximum number of individual in-parallel rpc calls. | ||
DefaultMaxParallelRpcCalls = 10 | ||
) | ||
|
||
// BatchResult is organised by contracts names, key is contract name. | ||
type BatchResult map[string]ContractResults | ||
type ContractResults []MethodCallResult | ||
type MethodCallResult struct { | ||
MethodName string | ||
ReturnValue any | ||
Err error | ||
} | ||
|
||
type BatchCall []Call | ||
type Call struct { | ||
ContractAddress common.Address | ||
ContractName, MethodName string | ||
Params, ReturnVal any | ||
} | ||
|
||
func (c BatchCall) String() string { | ||
callString := "" | ||
for _, call := range c { | ||
callString += fmt.Sprintf("%s\n", call.String()) | ||
} | ||
return callString | ||
} | ||
|
||
// Implement the String method for the Call struct | ||
func (c Call) String() string { | ||
return fmt.Sprintf("contractAddress: %s, contractName: %s, method: %s, params: %+v returnValType: %T", | ||
c.ContractAddress.Hex(), c.ContractName, c.MethodName, c.Params, c.ReturnVal) | ||
} | ||
|
||
//go:generate mockery --quiet --name BatchCaller --output ./rpclibmocks --outpkg rpclibmocks --filename batch_caller.go --case=underscore | ||
type BatchCaller interface { | ||
// BatchCall executes all the provided BatchRequest and returns the results in the same order | ||
// of the calls. Pass blockNumber=0 to use the latest block. | ||
BatchCall(ctx context.Context, blockNumber uint64, batchRequests BatchCall) (BatchResult, error) | ||
} | ||
|
||
// dynamicLimitedBatchCaller makes batched rpc calls and perform retries by reducing the batch size on each retry. | ||
type dynamicLimitedBatchCaller struct { | ||
bc *defaultEvmBatchCaller | ||
} | ||
|
||
func NewDynamicLimitedBatchCaller(lggr logger.Logger, codec types.Codec, evmClient client.Client, batchSizeLimit, backOffMultiplier, parallelRpcCallsLimit uint) BatchCaller { | ||
return &dynamicLimitedBatchCaller{ | ||
bc: newDefaultEvmBatchCaller(lggr, evmClient, codec, batchSizeLimit, backOffMultiplier, parallelRpcCallsLimit), | ||
} | ||
} | ||
|
||
func (c *dynamicLimitedBatchCaller) BatchCall(ctx context.Context, blockNumber uint64, reqs BatchCall) (BatchResult, error) { | ||
return c.bc.batchCallDynamicLimitRetries(ctx, blockNumber, reqs) | ||
} | ||
|
||
type defaultEvmBatchCaller struct { | ||
lggr logger.Logger | ||
evmClient client.Client | ||
codec types.Codec | ||
batchSizeLimit uint | ||
parallelRpcCallsLimit uint | ||
backOffMultiplier uint | ||
} | ||
|
||
// NewDefaultEvmBatchCaller returns a new batch caller instance. | ||
// batchCallLimit defines the maximum number of calls for BatchCallLimit method, pass 0 to keep the default. | ||
// backOffMultiplier defines the back-off strategy for retries on BatchCallDynamicLimitRetries method, pass 0 to keep the default. | ||
func newDefaultEvmBatchCaller( | ||
lggr logger.Logger, evmClient client.Client, codec types.Codec, batchSizeLimit, backOffMultiplier, parallelRpcCallsLimit uint, | ||
) *defaultEvmBatchCaller { | ||
batchSize := uint(DefaultRpcBatchSizeLimit) | ||
if batchSizeLimit > 0 { | ||
batchSize = batchSizeLimit | ||
} | ||
|
||
multiplier := uint(DefaultRpcBatchBackOffMultiplier) | ||
if backOffMultiplier > 0 { | ||
multiplier = backOffMultiplier | ||
} | ||
|
||
parallelRpcCalls := uint(DefaultMaxParallelRpcCalls) | ||
if parallelRpcCallsLimit > 0 { | ||
parallelRpcCalls = parallelRpcCallsLimit | ||
} | ||
|
||
return &defaultEvmBatchCaller{ | ||
lggr: lggr, | ||
evmClient: evmClient, | ||
codec: codec, | ||
batchSizeLimit: batchSize, | ||
parallelRpcCallsLimit: parallelRpcCalls, | ||
backOffMultiplier: multiplier, | ||
} | ||
} | ||
|
||
func (c *defaultEvmBatchCaller) batchCall(ctx context.Context, blockNumber uint64, batchCall BatchCall) ([]dataAndErr, error) { | ||
if len(batchCall) == 0 { | ||
return nil, nil | ||
} | ||
|
||
packedOutputs := make([]string, len(batchCall)) | ||
rpcBatchCalls := make([]rpc.BatchElem, len(batchCall)) | ||
for i, call := range batchCall { | ||
data, err := c.codec.Encode(ctx, call.Params, WrapItemType(call.ContractName, call.MethodName, true)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
blockNumStr := "latest" | ||
if blockNumber > 0 { | ||
blockNumStr = hexutil.EncodeBig(big.NewInt(0).SetUint64(blockNumber)) | ||
} | ||
|
||
rpcBatchCalls[i] = rpc.BatchElem{ | ||
Method: "eth_call", | ||
Args: []any{ | ||
map[string]interface{}{ | ||
"from": common.Address{}, | ||
"to": call.ContractAddress, | ||
"data": data, | ||
}, | ||
blockNumStr, | ||
}, | ||
Result: &packedOutputs[i], | ||
} | ||
} | ||
|
||
if err := c.evmClient.BatchCallContext(ctx, rpcBatchCalls); err != nil { | ||
return nil, fmt.Errorf("batch call context: %w", err) | ||
} | ||
|
||
results := make([]dataAndErr, len(batchCall)) | ||
for i, call := range batchCall { | ||
results[i] = dataAndErr{ | ||
contractName: call.ContractName, | ||
methodName: call.MethodName, | ||
returnVal: call.ReturnVal, | ||
} | ||
|
||
if rpcBatchCalls[i].Error != nil { | ||
results[i].err = rpcBatchCalls[i].Error | ||
continue | ||
} | ||
|
||
if packedOutputs[i] == "" { | ||
// Some RPCs instead of returning "0x" are returning an empty string. | ||
// We are overriding this behaviour for consistent handling of this scenario. | ||
packedOutputs[i] = "0x" | ||
} | ||
|
||
b, err := hexutil.Decode(packedOutputs[i]) | ||
if err != nil { | ||
return nil, fmt.Errorf("decode result %s: packedOutputs %s: %w", call, packedOutputs[i], err) | ||
} | ||
|
||
if err = c.codec.Decode(ctx, b, call.ReturnVal, WrapItemType(call.ContractName, call.MethodName, false)); err != nil { | ||
if len(b) == 0 { | ||
results[i].err = fmt.Errorf("unpack result %s: %s: %w", call, err.Error(), errEmptyOutput) | ||
} else { | ||
results[i].err = fmt.Errorf("unpack result %s: %w", call, err) | ||
} | ||
continue | ||
} | ||
results[i].returnVal = call.ReturnVal | ||
} | ||
|
||
return results, nil | ||
} | ||
|
||
func (c *defaultEvmBatchCaller) batchCallDynamicLimitRetries(ctx context.Context, blockNumber uint64, calls BatchCall) (BatchResult, error) { | ||
lim := c.batchSizeLimit | ||
// Limit the batch size to the number of calls | ||
if uint(len(calls)) < lim { | ||
lim = uint(len(calls)) | ||
} | ||
for { | ||
results, err := c.batchCallLimit(ctx, blockNumber, calls, lim) | ||
if err == nil { | ||
return results, nil | ||
} | ||
|
||
if lim <= 1 { | ||
return nil, errors.Wrapf(err, "calls %+v", calls) | ||
} | ||
|
||
newLim := lim / c.backOffMultiplier | ||
if newLim == 0 || newLim == lim { | ||
newLim = 1 | ||
} | ||
lim = newLim | ||
c.lggr.Errorf("retrying batch call with %d calls and %d limit that failed with error=%s", | ||
len(calls), lim, err) | ||
} | ||
} | ||
|
||
type dataAndErr struct { | ||
contractName, methodName string | ||
returnVal any | ||
err error | ||
} | ||
|
||
func (c *defaultEvmBatchCaller) batchCallLimit(ctx context.Context, blockNumber uint64, calls BatchCall, batchSizeLimit uint) (BatchResult, error) { | ||
if batchSizeLimit <= 0 { | ||
res, err := c.batchCall(ctx, blockNumber, calls) | ||
return convertToBatchResult(res), err | ||
} | ||
|
||
type job struct { | ||
blockNumber uint64 | ||
calls BatchCall | ||
results []dataAndErr | ||
} | ||
|
||
jobs := make([]job, 0) | ||
for i := 0; i < len(calls); i += int(batchSizeLimit) { | ||
idxFrom := i | ||
idxTo := idxFrom + int(batchSizeLimit) | ||
if idxTo > len(calls) { | ||
idxTo = len(calls) | ||
} | ||
jobs = append(jobs, job{blockNumber: blockNumber, calls: calls[idxFrom:idxTo], results: nil}) | ||
} | ||
|
||
if c.parallelRpcCallsLimit > 1 { | ||
eg := new(errgroup.Group) | ||
eg.SetLimit(int(c.parallelRpcCallsLimit)) | ||
for jobIdx := range jobs { | ||
jobIdx := jobIdx | ||
eg.Go(func() error { | ||
res, err := c.batchCall(ctx, jobs[jobIdx].blockNumber, jobs[jobIdx].calls) | ||
if err != nil { | ||
return err | ||
} | ||
jobs[jobIdx].results = res | ||
return nil | ||
}) | ||
} | ||
if err := eg.Wait(); err != nil { | ||
return nil, err | ||
} | ||
} else { | ||
var err error | ||
for jobIdx := range jobs { | ||
jobs[jobIdx].results, err = c.batchCall(ctx, jobs[jobIdx].blockNumber, jobs[jobIdx].calls) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
} | ||
|
||
var results []dataAndErr | ||
for _, jb := range jobs { | ||
results = append(results, jb.results...) | ||
} | ||
|
||
return convertToBatchResult(results), nil | ||
} | ||
|
||
func convertToBatchResult(data []dataAndErr) BatchResult { | ||
if data == nil { | ||
return nil | ||
} | ||
|
||
batchResult := make(BatchResult) | ||
for _, d := range data { | ||
methodCall := MethodCallResult{ | ||
MethodName: d.methodName, | ||
ReturnValue: d.returnVal, | ||
Err: d.err, | ||
} | ||
|
||
if _, exists := batchResult[d.contractName]; !exists { | ||
batchResult[d.contractName] = ContractResults{} | ||
} | ||
|
||
batchResult[d.contractName] = append(batchResult[d.contractName], methodCall) | ||
} | ||
|
||
return batchResult | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batchCaller and its tests are identical to what CCIP is currently using, with some adjustments to make it work better with Chain Reader interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This parallel batch caller is something now used by at least
ChainReader
,CCIP
,Automation
, andAtlas
with various minor differences. We should make a common component to do this with various post-processing hooks for the batch results. Then we wouldn't have so much code duplication.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the work to do this is way greater than maintaining two batchCallers since batchCaller isn't something that is frequently updated