Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-2219 USDC Support - Stubbing interfaces and new flow #117

Merged
merged 13 commits into from
Sep 16, 2024
4 changes: 1 addition & 3 deletions execute/exectypes/commit_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ type CommitData struct {
// ExecutedMessages are the messages in this report that have already been executed.
ExecutedMessages []cciptypes.SeqNum `json:"executedMessages"`

// The following values are cached for validation algorithms, serialization is not required for consensus.

// TokenData for each message.
TokenData [][][]byte `json:"-"`
MessageTokensData []MessageTokensData `json:"messageTokenData"`
mateusz-sekara marked this conversation as resolved.
Show resolved Hide resolved
}
73 changes: 72 additions & 1 deletion execute/exectypes/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,68 @@ type MessageObservations map[cciptypes.ChainSelector]map[cciptypes.SeqNum]ccipty
// must be encoding according to the destination chain requirements with typeconv.AddressBytesToString.
type NonceObservations map[cciptypes.ChainSelector]map[string]uint64

// TokenDataObservations contain token data for messages organized by source chain selector and sequence number.
// There could be multiple tokens per a single message, so MessageTokensData is a slice of TokenData.
// TokenDataObservations are populated during the Observation phase and depends on previously fetched
// MessageObservations details and the `tokenDataProcessors` configured in the ExecuteOffchainConfig.
// Content of the MessageTokensData is determined by the TokenDataProcessor implementations.
// - if Message doesn't have any tokens, TokenData slice will be empty.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose ChainSelector chain and SeqNum seq is a message that does not have token data. Is there no MessageTokenData object in the map? In that case tokenObs[chain][seq].IsReady() is false even though we're not waiting for data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageTokenData has to be always there, that's gonna be a requirement here. Therefore client doesn't have to handle nils or other weird edge cases. We move this responsibility to the processor to init everything properly, even messages that don't have tokens. In this case, it should be populated with an empty MessageTokensData which is always IsReady=true in that case

func (mtd MessageTokensData) IsReady() bool {
	for _, td := range mtd.TokenData {
		if !td.IsReady() {
			return false
		}
	}
	return true
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think comment explains that, no? TokenData slice will be empty and for empty TokenData we return IsReady=true

// - if Message has tokens, but these tokens don't require any special treatment,
// TokenData slice will contain empty TokenData objects.
// - if Message has tokens and these tokens require additional processing defined in ExecuteOffchainConfig,
// specific TokenDataProcessor will be used to populate the TokenData slice.
type TokenDataObservations map[cciptypes.ChainSelector]map[cciptypes.SeqNum]MessageTokensData

type MessageTokensData struct {
TokenData []TokenData
}

func (mtd MessageTokensData) IsReady() bool {
for _, td := range mtd.TokenData {
if !td.IsReady() {
return false
}
}
return true
}

func (mtd MessageTokensData) Error() error {
for _, td := range mtd.TokenData {
if td.Error != nil {
return td.Error
}
}
return nil
}

func (mtd MessageTokensData) ToByteSlice() [][]byte {
out := make([][]byte, len(mtd.TokenData))
for i, td := range mtd.TokenData {
out[i] = td.Data
}
return out
}

// TokenData is the token data for a single token in a message.
// It contains the token data and a flag indicating if the data is ready.
type TokenData struct {
Ready bool `json:"ready"`
Data []byte `json:"data"`
Error error
mateusz-sekara marked this conversation as resolved.
Show resolved Hide resolved
}

func NewEmptyTokenData() TokenData {
return TokenData{
Ready: false,
Error: nil,
Data: nil,
}
}

func (td TokenData) IsReady() bool {
return td.Ready
}

// Observation is the observation of the ExecutePlugin.
// TODO: revisit observation types. The maps used here are more space efficient and easier to work
// with but require more transformations compared to the on-chain representations.
Expand All @@ -32,17 +94,26 @@ type Observation struct {
// execute report.
Messages MessageObservations `json:"messages"`

// TokenData are determined during the second phase of execute.
// It contains the token data for the messages identified in the same stage as Messages
TokenData TokenDataObservations `json:"tokenDataObservations"`

// Nonces are determined during the third phase of execute.
// It contains the nonces of senders who are being considered for the final report.
Nonces NonceObservations `json:"nonces"`
}

// NewObservation constructs a Observation object.
mateusz-sekara marked this conversation as resolved.
Show resolved Hide resolved
func NewObservation(
commitReports CommitObservations, messages MessageObservations, nonces NonceObservations) Observation {
commitReports CommitObservations,
messages MessageObservations,
tokenData TokenDataObservations,
nonces NonceObservations,
) Observation {
return Observation{
CommitReports: commitReports,
Messages: messages,
TokenData: tokenData,
Nonces: nonces,
}
}
Expand Down
42 changes: 21 additions & 21 deletions execute/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"

"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
"github.com/smartcontractkit/chainlink-ccip/execute/internal/gas"
"github.com/smartcontractkit/chainlink-ccip/execute/tokendata"
"github.com/smartcontractkit/chainlink-ccip/internal/reader"
readerpkg "github.com/smartcontractkit/chainlink-ccip/pkg/reader"
"github.com/smartcontractkit/chainlink-ccip/pluginconfig"
Expand Down Expand Up @@ -49,15 +49,15 @@ func (p PluginFactoryConstructor) NewValidationService(ctx context.Context) (cor

// PluginFactory implements common ReportingPluginFactory and is used for (re-)initializing commit plugin instances.
type PluginFactory struct {
lggr logger.Logger
ocrConfig reader.OCR3ConfigWithMeta
execCodec cciptypes.ExecutePluginCodec
msgHasher cciptypes.MessageHasher
homeChainReader reader.HomeChain
estimateProvider gas.EstimateProvider
tokenDataReader exectypes.TokenDataReader
contractReaders map[cciptypes.ChainSelector]types.ContractReader
chainWriters map[cciptypes.ChainSelector]types.ChainWriter
lggr logger.Logger
ocrConfig reader.OCR3ConfigWithMeta
execCodec cciptypes.ExecutePluginCodec
msgHasher cciptypes.MessageHasher
homeChainReader reader.HomeChain
estimateProvider gas.EstimateProvider
tokenDataProcessor tokendata.TokenDataProcessor
contractReaders map[cciptypes.ChainSelector]types.ContractReader
chainWriters map[cciptypes.ChainSelector]types.ChainWriter
}

func NewPluginFactory(
Expand All @@ -66,21 +66,21 @@ func NewPluginFactory(
execCodec cciptypes.ExecutePluginCodec,
msgHasher cciptypes.MessageHasher,
homeChainReader reader.HomeChain,
tokenDataReader exectypes.TokenDataReader,
tokenDataProcessor tokendata.TokenDataProcessor,
estimateProvider gas.EstimateProvider,
contractReaders map[cciptypes.ChainSelector]types.ContractReader,
chainWriters map[cciptypes.ChainSelector]types.ChainWriter,
) *PluginFactory {
return &PluginFactory{
lggr: lggr,
ocrConfig: ocrConfig,
execCodec: execCodec,
msgHasher: msgHasher,
homeChainReader: homeChainReader,
estimateProvider: estimateProvider,
contractReaders: contractReaders,
chainWriters: chainWriters,
tokenDataReader: tokenDataReader,
lggr: lggr,
ocrConfig: ocrConfig,
execCodec: execCodec,
msgHasher: msgHasher,
homeChainReader: homeChainReader,
estimateProvider: estimateProvider,
contractReaders: contractReaders,
chainWriters: chainWriters,
tokenDataProcessor: tokenDataProcessor,
}
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func (p PluginFactory) NewReportingPlugin(
p.execCodec,
p.msgHasher,
p.homeChainReader,
p.tokenDataReader,
p.tokenDataProcessor,
p.estimateProvider,
p.lggr,
), ocr3types.ReportingPluginInfo{
Expand Down
54 changes: 32 additions & 22 deletions execute/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
"github.com/smartcontractkit/chainlink-ccip/execute/internal/gas"
"github.com/smartcontractkit/chainlink-ccip/execute/report"
"github.com/smartcontractkit/chainlink-ccip/execute/tokendata"
typeconv "github.com/smartcontractkit/chainlink-ccip/internal/libs/typeconv"
"github.com/smartcontractkit/chainlink-ccip/internal/plugincommon"
"github.com/smartcontractkit/chainlink-ccip/internal/reader"
Expand All @@ -41,10 +42,10 @@ type Plugin struct {
msgHasher cciptypes.MessageHasher
homeChain reader.HomeChain

oracleIDToP2pID map[commontypes.OracleID]libocrtypes.PeerID
tokenDataReader exectypes.TokenDataReader
estimateProvider gas.EstimateProvider
lggr logger.Logger
oracleIDToP2pID map[commontypes.OracleID]libocrtypes.PeerID
tokenDataProcessor tokendata.TokenDataProcessor
estimateProvider gas.EstimateProvider
lggr logger.Logger
}

func NewPlugin(
Expand All @@ -55,7 +56,7 @@ func NewPlugin(
reportCodec cciptypes.ExecutePluginCodec,
msgHasher cciptypes.MessageHasher,
homeChain reader.HomeChain,
tokenDataReader exectypes.TokenDataReader,
tokenDataProcessor tokendata.TokenDataProcessor,
estimateProvider gas.EstimateProvider,
lggr logger.Logger,
) *Plugin {
Expand All @@ -70,17 +71,17 @@ func NewPlugin(
}

return &Plugin{
reportingCfg: reportingCfg,
cfg: cfg,
oracleIDToP2pID: oracleIDToP2pID,
ccipReader: ccipReader,
readerSyncer: readerSyncer,
reportCodec: reportCodec,
msgHasher: msgHasher,
homeChain: homeChain,
tokenDataReader: tokenDataReader,
estimateProvider: estimateProvider,
lggr: lggr,
reportingCfg: reportingCfg,
cfg: cfg,
oracleIDToP2pID: oracleIDToP2pID,
ccipReader: ccipReader,
readerSyncer: readerSyncer,
reportCodec: reportCodec,
msgHasher: msgHasher,
homeChain: homeChain,
tokenDataProcessor: tokenDataProcessor,
estimateProvider: estimateProvider,
lggr: lggr,
}
}

Expand Down Expand Up @@ -188,7 +189,7 @@ func (p *Plugin) Observation(
}

// TODO: truncate grouped to a maximum observation size?
return exectypes.NewObservation(groupedCommits, nil, nil).Encode()
return exectypes.NewObservation(groupedCommits, nil, nil, nil).Encode()
}

// No observation for non-dest readers.
Expand Down Expand Up @@ -243,8 +244,12 @@ func (p *Plugin) Observation(
groupedCommits[report.SourceChain] = append(groupedCommits[report.SourceChain], report)
}

// TODO: Fire off messages for an attestation check service.
return exectypes.NewObservation(groupedCommits, messages, nil).Encode()
tkData, err1 := p.tokenDataProcessor.ProcessTokenData(ctx, messages)
if err1 != nil {
return types.Observation{}, fmt.Errorf("unable to process token data %w", err)
mateusz-sekara marked this conversation as resolved.
Show resolved Hide resolved
}

return exectypes.NewObservation(groupedCommits, messages, tkData, nil).Encode()

case exectypes.Filter:
// Phase 3: observe nonce for each unique source/sender pair.
Expand Down Expand Up @@ -274,7 +279,7 @@ func (p *Plugin) Observation(
nonceObservations[srcChain] = nonces
}

return exectypes.NewObservation(nil, nil, nonceObservations).Encode()
return exectypes.NewObservation(nil, nil, nil, nonceObservations).Encode()
default:
return types.Observation{}, fmt.Errorf("unknown state")
}
Expand Down Expand Up @@ -400,8 +405,13 @@ func (p *Plugin) Outcome(
if msg, ok := observation.Messages[report.SourceChain][i]; ok {
report.Messages = append(report.Messages, msg)
}

mateusz-sekara marked this conversation as resolved.
Show resolved Hide resolved
if tokenData, ok := observation.TokenData[report.SourceChain][i]; ok {
report.MessageTokensData = append(report.MessageTokensData, tokenData)
}
}
commitReports[i].Messages = report.Messages
commitReports[i].MessageTokensData = report.MessageTokensData
}

outcome = exectypes.NewOutcome(state, commitReports, cciptypes.ExecutePluginReport{})
Expand All @@ -413,13 +423,13 @@ func (p *Plugin) Outcome(
context.Background(),
p.lggr,
p.msgHasher,
p.tokenDataReader,
p.reportCodec,
p.estimateProvider,
observation.Nonces,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Nonces should be added to selectReport instead of being part of the this constructor.

p.cfg.DestChain,
uint64(maxReportSizeBytes),
p.cfg.OffchainConfig.BatchGasLimit)
p.cfg.OffchainConfig.BatchGasLimit,
)
outcomeReports, commitReports, err := selectReport(
p.lggr,
commitReports,
Expand Down
14 changes: 7 additions & 7 deletions execute/plugin_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
"github.com/smartcontractkit/chainlink-ccip/execute/internal/gas/evm"
"github.com/smartcontractkit/chainlink-ccip/execute/report"
"github.com/smartcontractkit/chainlink-ccip/execute/tokendata"
"github.com/smartcontractkit/chainlink-ccip/internal/libs/slicelib"
"github.com/smartcontractkit/chainlink-ccip/internal/libs/testhelpers"
"github.com/smartcontractkit/chainlink-ccip/internal/mocks"
Expand Down Expand Up @@ -236,14 +237,13 @@ func setupSimpleTest(
err = homeChain.Start(ctx)
require.NoError(t, err, "failed to start home chain poller")

tokenDataReader := mock_types.NewMockTokenDataReader(t)
tokenDataReader.On("ReadTokenData", mock.Anything, mock.Anything, mock.Anything).Return([][]byte{}, nil)
tokenDataProcessor := &tokendata.NoopTokenProcessor{}

oracleIDToP2pID := GetP2pIDs(1, 2, 3)
nodes := []nodeSetup{
newNode(ctx, t, lggr, cfg, msgHasher, ccipReader, homeChain, tokenDataReader, oracleIDToP2pID, 1, 1),
newNode(ctx, t, lggr, cfg, msgHasher, ccipReader, homeChain, tokenDataReader, oracleIDToP2pID, 2, 1),
newNode(ctx, t, lggr, cfg, msgHasher, ccipReader, homeChain, tokenDataReader, oracleIDToP2pID, 3, 1),
newNode(ctx, t, lggr, cfg, msgHasher, ccipReader, homeChain, tokenDataProcessor, oracleIDToP2pID, 1, 1),
newNode(ctx, t, lggr, cfg, msgHasher, ccipReader, homeChain, tokenDataProcessor, oracleIDToP2pID, 2, 1),
newNode(ctx, t, lggr, cfg, msgHasher, ccipReader, homeChain, tokenDataProcessor, oracleIDToP2pID, 3, 1),
}

err = homeChain.Close()
Expand All @@ -261,7 +261,7 @@ func newNode(
msgHasher cciptypes.MessageHasher,
ccipReader readerpkg.CCIPReader,
homeChain reader.HomeChain,
tokenDataReader exectypes.TokenDataReader,
tokenDataProcessor tokendata.TokenDataProcessor,
oracleIDToP2pID map[commontypes.OracleID]libocrtypes.PeerID,
id int,
N int,
Expand All @@ -281,7 +281,7 @@ func newNode(
reportCodec,
msgHasher,
homeChain,
tokenDataReader,
tokenDataProcessor,
evm.EstimateProvider{},
lggr)

Expand Down
21 changes: 20 additions & 1 deletion execute/plugin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,17 @@ func mergeCommitObservations(
return results, nil
}

func mergeTokenObservations(
mateusz-sekara marked this conversation as resolved.
Show resolved Hide resolved
observations []decodedAttributedObservation,
_ map[cciptypes.ChainSelector]int,
) exectypes.TokenDataObservations {
// Return first one, dummy implementation to make tests passing
for _, ao := range observations {
return ao.Observation.TokenData
}
return nil
}

// mergeNonceObservations merges all observations which reach the fChain threshold into a single result.
// Any observations, or subsets of observations, which do not reach the threshold are ignored.
func mergeNonceObservations(
Expand Down Expand Up @@ -396,6 +407,12 @@ func getConsensusObservation(
"oracle", oracleID,
"mergedMessageObservations", mergedMessageObservations)

mergedTokenObservations := mergeTokenObservations(decodedObservations, fChain)
lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: merged token data observations", oracleID),
"oracle", oracleID,
"mergedTokenObservations", mergedTokenObservations)

mergedNonceObservations :=
mergeNonceObservations(decodedObservations, fChain[destChainSelector])
if err != nil {
Expand All @@ -409,7 +426,9 @@ func getConsensusObservation(
observation := exectypes.NewObservation(
mergedCommitObservations,
mergedMessageObservations,
mergedNonceObservations)
mergedTokenObservations,
mergedNonceObservations,
)

return observation, nil
}
Loading
Loading