Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-2882 Implementing attestation API #162

Merged
merged 7 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions execute/tokendata/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewConfigBasedCompositeObservers(
// e.g. observers[i] := config.CreateTokenDataObserver()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewConfigBasedCompositeObservers is an exported function that returns an unexported type. In general it's not a good approach. Either return interface or exported struct.

switch {
case c.USDCCCTPObserverConfig != nil:
observer, err1 := createUSDCTokenObserver(lggr, destChainSelector, c.USDCCCTPObserverConfig.Tokens, readers)
observer, err1 := createUSDCTokenObserver(lggr, destChainSelector, *c.USDCCCTPObserverConfig, readers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why err1 and not err ?

if err1 != nil {
return nil, err1
}
Expand All @@ -75,11 +75,11 @@ func NewConfigBasedCompositeObservers(
func createUSDCTokenObserver(
lggr logger.Logger,
destChainSelector cciptypes.ChainSelector,
tokensConfig map[cciptypes.ChainSelector]pluginconfig.USDCCCTPTokenConfig,
cctpConfig pluginconfig.USDCCCTPObserverConfig,
readers map[cciptypes.ChainSelector]contractreader.ContractReaderFacade,
) (TokenDataObserver, error) {
usdcReader, err := reader.NewUSDCMessageReader(
tokensConfig,
cctpConfig.Tokens,
readers,
)
if err != nil {
Expand All @@ -89,9 +89,9 @@ func createUSDCTokenObserver(
return usdc.NewTokenDataObserver(
lggr,
destChainSelector,
tokensConfig,
cctpConfig.Tokens,
usdcReader,
nil,
usdc.NewAttestationClient(cctpConfig),
), nil
}

Expand Down
71 changes: 63 additions & 8 deletions execute/tokendata/usdc/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,34 @@ import (
"context"
"errors"

"github.com/smartcontractkit/chainlink-common/pkg/hashutil"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"

"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
"github.com/smartcontractkit/chainlink-ccip/pkg/reader"
"github.com/smartcontractkit/chainlink-ccip/pluginconfig"
)

var (
ErrDataMissing = errors.New("token data missing")
ErrNotReady = errors.New("token data not ready")
ErrRateLimit = errors.New("token data API is being rate limited")
ErrTimeout = errors.New("token data API timed out")
ErrDataMissing = errors.New("token data missing")
ErrNotReady = errors.New("token data not ready")
ErrRateLimit = errors.New("token data API is being rate limited")
ErrTimeout = errors.New("token data API timed out")
ErrUnknownResponse = errors.New("unexpected response from attestation API")
)

type AttestationStatus struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have comment for AttestationStatus.
e.g.

// AttestationStatus is returned from CCTP api / is used internally to keep track of / ...

Error error
Data [32]byte
MessageHash []byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can replace []byte with cciptypes.Bytes, that way you also get cool hex encoding out of the box in logs.

Attestation []byte
Error error
}

func SuccessAttestationStatus(messageHash []byte, attestation []byte) AttestationStatus {
return AttestationStatus{MessageHash: messageHash, Attestation: attestation}
}

func ErrorAttestationStatus(err error) AttestationStatus {
return AttestationStatus{Error: err}
}

// AttestationClient is an interface for fetching attestation data from the Circle API.
Expand All @@ -30,15 +42,58 @@ type AttestationStatus struct {
// Ethereum ->
//
// 12 ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean for 12 there is no entry in the map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's nested map :D So 12 is a chainSelector, next line contains padded messageTokenIds

// 0 -> AttestationStatus{Error: nil, Data: [32]byte{attestation_data}}
// 2 -> AttestationStatus{Error: ErrNotRead, Data: nil}
// 0 -> AttestationStatus{Error: nil, Attestation: "ABCDEF", MessageHash: bytes}
// 2 -> AttestationStatus{Error: ErrNotRead, Attestation: nil, MessageHash: nil}
type AttestationClient interface {
Attestations(
ctx context.Context,
msgs map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]reader.MessageHash,
) (map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]AttestationStatus, error)
}

type sequentialAttestationClient struct {
client HTTPClient
hasher hashutil.Hasher[[32]byte]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can replace [32]byte with cciptypes.Bytes32

}

//nolint:revive
func NewAttestationClient(config pluginconfig.USDCCCTPObserverConfig) *sequentialAttestationClient {
return &sequentialAttestationClient{
client: NewHTTPClient(
config.AttestationAPI,
config.AttestationAPIInterval.Duration(),
config.AttestationAPITimeout.Duration(),
),
hasher: hashutil.NewKeccak(),
}
}

func (s *sequentialAttestationClient) Attestations(
ctx context.Context,
msgs map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]reader.MessageHash,
) (map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]AttestationStatus, error) {
outcome := make(map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]AttestationStatus)

for chainSelector, hashes := range msgs {
outcome[chainSelector] = make(map[exectypes.MessageTokenID]AttestationStatus)

for tokenID, messageHash := range hashes {
// TODO sequential processing
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next PR, I'm optimizing for having working usdc flow asap. We can optimize the code by caching or parallel processing later

outcome[chainSelector][tokenID] = s.fetchSingleMessage(ctx, messageHash)
}
}
return outcome, nil
}

func (s *sequentialAttestationClient) fetchSingleMessage(ctx context.Context, messageHash []byte) AttestationStatus {
response, _, err := s.client.Get(ctx, s.hasher.Hash(messageHash))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hash(messageHash) this is a bit confusing, maybe a comment to explain?

if err != nil {
return ErrorAttestationStatus(err)
}

return SuccessAttestationStatus(messageHash, response)
}

type FakeAttestationClient struct {
Data map[string]AttestationStatus
}
Expand Down
203 changes: 203 additions & 0 deletions execute/tokendata/usdc/attestation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package usdc

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
"github.com/smartcontractkit/chainlink-ccip/pkg/reader"
"github.com/smartcontractkit/chainlink-ccip/pluginconfig"
)

func createHandler(t *testing.T, success []string, pending []string) http.HandlerFunc {
successes := make(map[string]string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function could be optimized a bit.

  • dedup "/v1/attestations/0x" + hash
  • response := writeResponse(status="complete", attestation="...")

for _, hash := range success {
successes["/v1/attestations/0x"+hash] = hash
}

pendings := make(map[string]string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

successes -> success ?
pendings -> pending ?

for _, hash := range pending {
pendings["/v1/attestations/0x"+hash] = hash
}

return func(w http.ResponseWriter, r *http.Request) {
if hash, ok := successes[r.URL.String()]; ok {
response := fmt.Sprintf(`
{
"status": "complete",
"attestation": "%s"
}`, hash)
_, err := w.Write([]byte(response))
require.NoError(t, err)
} else if hash1, ok1 := pendings[r.URL.String()]; ok1 {
response := fmt.Sprintf(`
{
"status": "pending_confirmations",
"attestation": "%s"
}`, hash1)
_, err := w.Write([]byte(response))
require.NoError(t, err)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
}
}

func Test_AttestationClient(t *testing.T) {
type example struct {
hash []byte
keecak string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo keccak

}

messageA := example{
hash: []byte{0xA},
keecak: "0ef9d8f8804d174666011a394cab7901679a8944d24249fd148a6a36071151f8",
}

messageB := example{
hash: []byte{0xB},
keecak: "60811857dd566889ff6255277d82526f2d9b3bbcb96076be22a5860765ac3d06",
}

messageC := example{
hash: []byte{0xC},
keecak: "4de0e96b0a8886e42a2c35b57df8a9d58a93b5bff655bc37a30e2ab8e29dc066",
}

tt := []struct {
name string
success []string
pending []string
input map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]reader.MessageHash
expected map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]AttestationStatus
}{
{
name: "empty input",
input: map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]reader.MessageHash{},
expected: map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]AttestationStatus{},
},
{
name: "single success",
success: []string{messageA.keecak},
input: map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]reader.MessageHash{
cciptypes.ChainSelector(1): {
exectypes.NewMessageTokenID(1, 1): messageA.hash,
},
},
expected: map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]AttestationStatus{
cciptypes.ChainSelector(1): {
exectypes.NewMessageTokenID(1, 1): SuccessAttestationStatus(messageA.hash, mustDecode(messageA.keecak)),
},
},
},
{
name: "single pending",
pending: []string{messageA.keecak},
input: map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]reader.MessageHash{
cciptypes.ChainSelector(1): {
exectypes.NewMessageTokenID(1, 1): messageA.hash,
},
},
expected: map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]AttestationStatus{
cciptypes.ChainSelector(1): {
exectypes.NewMessageTokenID(1, 1): ErrorAttestationStatus(ErrNotReady),
},
},
},
{
name: "multiple success",
success: []string{messageA.keecak, messageB.keecak, messageC.keecak},
input: map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]reader.MessageHash{
cciptypes.ChainSelector(1): {
exectypes.NewMessageTokenID(1, 1): messageA.hash,
exectypes.NewMessageTokenID(1, 2): messageB.hash,
},
cciptypes.ChainSelector(2): {
exectypes.NewMessageTokenID(2, 1): messageC.hash,
},
},
expected: map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]AttestationStatus{
cciptypes.ChainSelector(1): {
exectypes.NewMessageTokenID(1, 1): SuccessAttestationStatus(messageA.hash, mustDecode(messageA.keecak)),
exectypes.NewMessageTokenID(1, 2): SuccessAttestationStatus(messageB.hash, mustDecode(messageB.keecak)),
},
cciptypes.ChainSelector(2): {
exectypes.NewMessageTokenID(2, 1): SuccessAttestationStatus(messageC.hash, mustDecode(messageC.keecak)),
},
},
},
{
name: "multiple failures - A, C not ready but B internal error",
pending: []string{messageA.keecak, messageC.keecak},
input: map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]reader.MessageHash{
cciptypes.ChainSelector(1): {
exectypes.NewMessageTokenID(1, 1): messageA.hash,
exectypes.NewMessageTokenID(1, 2): messageB.hash,
},
cciptypes.ChainSelector(2): {
exectypes.NewMessageTokenID(2, 1): messageC.hash,
},
},
expected: map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]AttestationStatus{
cciptypes.ChainSelector(1): {
exectypes.NewMessageTokenID(1, 1): ErrorAttestationStatus(ErrNotReady),
exectypes.NewMessageTokenID(1, 2): ErrorAttestationStatus(ErrUnknownResponse),
},
cciptypes.ChainSelector(2): {
exectypes.NewMessageTokenID(2, 1): ErrorAttestationStatus(ErrNotReady),
},
},
},
{
name: "mixed success and failure",
success: []string{messageA.keecak, messageC.keecak},
pending: []string{messageB.keecak},
input: map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]reader.MessageHash{
cciptypes.ChainSelector(1): {
exectypes.NewMessageTokenID(1, 1): messageA.hash,
},
cciptypes.ChainSelector(2): {
exectypes.NewMessageTokenID(2, 1): messageB.hash,
},
cciptypes.ChainSelector(3): {
exectypes.NewMessageTokenID(3, 1): messageC.hash,
},
},
expected: map[cciptypes.ChainSelector]map[exectypes.MessageTokenID]AttestationStatus{
cciptypes.ChainSelector(1): {
exectypes.NewMessageTokenID(1, 1): SuccessAttestationStatus(messageA.hash, mustDecode(messageA.keecak)),
},
cciptypes.ChainSelector(2): {
exectypes.NewMessageTokenID(2, 1): ErrorAttestationStatus(ErrNotReady),
},
cciptypes.ChainSelector(3): {
exectypes.NewMessageTokenID(3, 1): SuccessAttestationStatus(messageC.hash, mustDecode(messageC.keecak)),
},
},
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
server := httptest.NewServer(createHandler(t, tc.success, tc.pending))
defer server.Close()

client := NewAttestationClient(pluginconfig.USDCCCTPObserverConfig{
AttestationAPI: server.URL,
AttestationAPIInterval: commonconfig.MustNewDuration(1 * time.Millisecond),
AttestationAPITimeout: commonconfig.MustNewDuration(1 * time.Minute),
})
attestations, err := client.Attestations(tests.Context(t), tc.input)
require.NoError(t, err)
require.Equal(t, tc.expected, attestations)
})
}
}
Loading
Loading