Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warp API support #345

Merged
merged 15 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ The Fuji and Mainnet [public API nodes](https://docs.avax.network/tooling/rpc-pr

### Peer-to-Peer Connections

- The AWM relayer implementation gathers BLS signatures from the validators of the source Subnet via peer-to-peer `AppRequest` messages. Validator nodes need to be configured to accept incoming peer connections. Otherwise, the relayer will fail to gather Warp message signatures. For example, networking rules may need to be adjusted to allow traffic on the default AvalancheGo P2P port (9651), or the public IP may need to be manually set in the [node configuration](https://docs.avax.network/nodes/configure/avalanchego-config-flags#public-ip).
- By default, the AWM relayer implementation gathers BLS signatures from the validators of the source Subnet via peer-to-peer `AppRequest` messages. Validator nodes need to be configured to accept incoming peer connections. Otherwise, the relayer will fail to gather Warp message signatures. For example, networking rules may need to be adjusted to allow traffic on the default AvalancheGo P2P port (9651), or the public IP may need to be manually set in the [node configuration](https://docs.avax.network/nodes/configure/avalanchego-config-flags#public-ip).
- If configured to use the Warp API (see `warp-api-endpoint` in [Configuration](#configuration)) then aggregate signatures are fetched via a single RPC request, rather than `AppRequests` to individual validators. Note that the Warp API is disabled on the public API.

### Private Key Management

Expand Down Expand Up @@ -252,6 +253,10 @@ The relayer is configured via a JSON file, the path to which is passed in via th

- List of addresses on this source blockchain to relay Warp messages from. The sending address is defined by the message protocol. For example, it could be defined as the EOA that initiates the transaction, or the address that calls the message protocol contract. If empty, then all addresses are allowed.

`"warp-api-endpoint": APIConfig`

- The RPC endpoint configuration for the Warp API, which is used to fetch Warp aggregate signatures. If omitted, then signatures are fetched via AppRequest instead.

`"destination-blockchains": []DestinationBlockchains`

- The list of destination blockchains to support. Each `DestinationBlockchain` has the following configuration:
Expand Down
2 changes: 1 addition & 1 deletion config/destination_blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *DestinationBlockchain) initializeWarpQuorum() error {
return fmt.Errorf("invalid subnetID in configuration. error: %w", err)
}

client, err := utils.DialWithConfig(context.Background(), s.RPCEndpoint.BaseURL, s.RPCEndpoint.HTTPHeaders, s.RPCEndpoint.QueryParams)
client, err := utils.NewEthClientWithConfig(context.Background(), s.RPCEndpoint.BaseURL, s.RPCEndpoint.HTTPHeaders, s.RPCEndpoint.QueryParams)
if err != nil {
return fmt.Errorf("failed to dial destination blockchain %s: %w", blockchainID, err)
}
Expand Down
14 changes: 14 additions & 0 deletions config/source_blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ type SourceBlockchain struct {
SupportedDestinations []*SupportedDestination `mapstructure:"supported-destinations" json:"supported-destinations"`
ProcessHistoricalBlocksFromHeight uint64 `mapstructure:"process-historical-blocks-from-height" json:"process-historical-blocks-from-height"`
AllowedOriginSenderAddresses []string `mapstructure:"allowed-origin-sender-addresses" json:"allowed-origin-sender-addresses"`
WarpAPIEndpoint APIConfig `mapstructure:"warp-api-endpoint" json:"warp-api-endpoint"`

// convenience fields to access parsed data after initialization
subnetID ids.ID
blockchainID ids.ID
allowedOriginSenderAddresses []common.Address
useAppRequestNetwork bool
}

// Validates the source subnet configuration, including verifying that the supported destinations are present in destinationBlockchainIDs
Expand All @@ -47,6 +49,14 @@ func (s *SourceBlockchain) Validate(destinationBlockchainIDs *set.Set[string]) e
if err := s.WSEndpoint.Validate(); err != nil {
return fmt.Errorf("invalid ws-endpoint in source subnet configuration: %w", err)
}
// The Warp API endpoint is optional. If omitted, signatures are fetched from validators via app request.
if s.WarpAPIEndpoint.BaseURL != "" {
if err := s.WarpAPIEndpoint.Validate(); err != nil {
return fmt.Errorf("invalid warp-api-endpoint in source subnet configuration: %w", err)
}
} else {
s.useAppRequestNetwork = true
}

// Validate the VM specific settings
switch ParseVM(s.VM) {
Expand Down Expand Up @@ -140,6 +150,10 @@ func (s *SourceBlockchain) GetAllowedOriginSenderAddresses() []common.Address {
return s.allowedOriginSenderAddresses
}

func (s *SourceBlockchain) UseAppRequestNetwork() bool {
return s.useAppRequestNetwork
}

// Specifies a supported destination blockchain and addresses for a source blockchain.
type SupportedDestination struct {
BlockchainID string `mapstructure:"blockchain-id" json:"blockchain-id"`
Expand Down
2 changes: 1 addition & 1 deletion main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func main() {
// errgroup will cancel the context when the first goroutine returns an error
errGroup.Go(func() error {
// Dial the eth client
ethClient, err := utils.DialWithConfig(
ethClient, err := utils.NewEthClientWithConfig(
context.Background(),
sourceBlockchain.RPCEndpoint.BaseURL,
sourceBlockchain.RPCEndpoint.HTTPHeaders,
Expand Down
98 changes: 58 additions & 40 deletions relayer/application_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import (
"github.com/ava-labs/awm-relayer/vms"
coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message"
msg "github.com/ava-labs/subnet-evm/plugin/evm/message"
warpBackend "github.com/ava-labs/subnet-evm/warp"
"github.com/ava-labs/subnet-evm/rpc"
"github.com/ethereum/go-ethereum/common/hexutil"
"golang.org/x/sync/errgroup"

"go.uber.org/zap"
Expand Down Expand Up @@ -58,18 +59,19 @@ var (
// to a specific destination address on a specific destination blockchain. This routing information is
// encapsulated in [relayerID], which also represents the database key for an ApplicationRelayer.
type ApplicationRelayer struct {
logger logging.Logger
metrics *ApplicationRelayerMetrics
network *peers.AppRequestNetwork
messageCreator message.Creator
sourceBlockchain config.SourceBlockchain
signingSubnetID ids.ID
destinationClient vms.DestinationClient
relayerID database.RelayerID
warpQuorum config.WarpQuorum
checkpointManager *checkpoint.CheckpointManager
currentRequestID uint32
lock *sync.RWMutex
logger logging.Logger
metrics *ApplicationRelayerMetrics
network *peers.AppRequestNetwork
messageCreator message.Creator
sourceBlockchain config.SourceBlockchain
signingSubnetID ids.ID
destinationClient vms.DestinationClient
relayerID database.RelayerID
warpQuorum config.WarpQuorum
checkpointManager *checkpoint.CheckpointManager
currentRequestID uint32
lock *sync.RWMutex
sourceWarpSignatureClient *rpc.Client
}

func NewApplicationRelayer(
Expand Down Expand Up @@ -108,19 +110,39 @@ func NewApplicationRelayer(
checkpointManager := checkpoint.NewCheckpointManager(logger, db, sub, relayerID, startingHeight)
checkpointManager.Run()

var warpClient *rpc.Client
if !sourceBlockchain.UseAppRequestNetwork() {
// The subnet-evm Warp API client does not support query parameters or HTTP headers, and expects the URI to be in a specific form.
// Instead, we invoke the Warp API directly via the RPC client.
warpClient, err = utils.DialWithConfig(
context.Background(),
sourceBlockchain.WarpAPIEndpoint.BaseURL,
sourceBlockchain.WarpAPIEndpoint.HTTPHeaders,
sourceBlockchain.WarpAPIEndpoint.QueryParams,
)
if err != nil {
logger.Error(
"Failed to create Warp API client",
zap.Error(err),
)
return nil, err
}
}

ar := ApplicationRelayer{
logger: logger,
metrics: metrics,
network: network,
messageCreator: messageCreator,
sourceBlockchain: sourceBlockchain,
destinationClient: destinationClient,
relayerID: relayerID,
signingSubnetID: signingSubnet,
warpQuorum: quorum,
checkpointManager: checkpointManager,
currentRequestID: rand.Uint32(), // TODONOW: pass via ctor
lock: &sync.RWMutex{},
logger: logger,
metrics: metrics,
network: network,
messageCreator: messageCreator,
sourceBlockchain: sourceBlockchain,
destinationClient: destinationClient,
relayerID: relayerID,
signingSubnetID: signingSubnet,
warpQuorum: quorum,
checkpointManager: checkpointManager,
currentRequestID: rand.Uint32(), // TODONOW: pass via ctor
lock: &sync.RWMutex{},
sourceWarpSignatureClient: warpClient,
}

return &ar, nil
Expand Down Expand Up @@ -170,7 +192,6 @@ func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) err
err := r.relayMessage(
reqID,
handler,
true,
)

return err
Expand All @@ -183,7 +204,6 @@ func (r *ApplicationRelayer) RelayerID() database.RelayerID {
func (r *ApplicationRelayer) relayMessage(
requestID uint32,
handler messages.MessageHandler,
useAppRequestNetwork bool,
) error {
r.logger.Debug(
"Relaying message",
Expand All @@ -209,7 +229,9 @@ func (r *ApplicationRelayer) relayMessage(
startCreateSignedMessageTime := time.Now()
// Query nodes on the origin chain for signatures, and construct the signed warp message.
var signedMessage *avalancheWarp.Message
if useAppRequestNetwork {

// sourceWarpSignatureClient is nil iff the source blockchain is configured to fetch signatures via AppRequest
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
if r.sourceWarpSignatureClient == nil {
signedMessage, err = r.createSignedMessageAppRequest(unsignedMessage, requestID)
if err != nil {
r.logger.Error(
Expand Down Expand Up @@ -257,18 +279,11 @@ func (r *ApplicationRelayer) relayMessage(
// will need to be accounted for here.
func (r *ApplicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp.UnsignedMessage) (*avalancheWarp.Message, error) {
r.logger.Info("Fetching aggregate signature from the source chain validators via API")
// TODO: To properly support this, we should provide a dedicated Warp API endpoint in the config
uri := utils.StripFromString(r.sourceBlockchain.RPCEndpoint.BaseURL, "/ext")
warpClient, err := warpBackend.NewClient(uri, r.sourceBlockchain.GetBlockchainID().String())
if err != nil {
r.logger.Error(
"Failed to create Warp API client",
zap.Error(err),
)
return nil, err
}

var signedWarpMessageBytes []byte
var (
signedWarpMessageBytes hexutil.Bytes
err error
)
for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ {
r.logger.Debug(
"Relayer collecting signatures from peers.",
Expand All @@ -277,8 +292,11 @@ func (r *ApplicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp.
zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
zap.String("signingSubnetID", r.signingSubnetID.String()),
)
signedWarpMessageBytes, err = warpClient.GetMessageAggregateSignature(

err = r.sourceWarpSignatureClient.CallContext(
context.Background(),
&signedWarpMessageBytes,
"warp_getMessageAggregateSignature",
unsignedMessage.ID(),
r.warpQuorum.QuorumNumerator,
r.signingSubnetID.String(),
Expand Down
2 changes: 1 addition & 1 deletion relayer/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewListener(
)
return nil, err
}
ethWSClient, err := utils.DialWithConfig(
ethWSClient, err := utils.NewEthClientWithConfig(
context.Background(),
sourceBlockchain.WSEndpoint.BaseURL,
sourceBlockchain.WSEndpoint.HTTPHeaders,
Expand Down
3 changes: 3 additions & 0 deletions tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,7 @@ var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() {
ginkgo.It("Batch Message", func() {
BatchRelay(localNetworkInstance)
})
ginkgo.It("Warp API", func() {
WarpAPIRelay(localNetworkInstance)
})
})
95 changes: 95 additions & 0 deletions tests/warp_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package tests

import (
"context"
"time"

testUtils "github.com/ava-labs/awm-relayer/tests/utils"
"github.com/ava-labs/teleporter/tests/interfaces"
"github.com/ava-labs/teleporter/tests/utils"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
. "github.com/onsi/gomega"
)

// This tests the basic functionality of the relayer using the Warp API/, rather than app requests. Includes:
// - Relaying from Subnet A to Subnet B
// - Relaying from Subnet B to Subnet A
func WarpAPIRelay(network interfaces.LocalNetwork) {
subnetAInfo := network.GetPrimaryNetworkInfo()
subnetBInfo, _ := utils.GetTwoSubnets(network)
fundedAddress, fundedKey := network.GetFundedAccountInfo()
teleporterContractAddress := network.GetTeleporterContractAddress()
err := testUtils.ClearRelayerStorage()
Expect(err).Should(BeNil())

//
// Fund the relayer address on all subnets
//
ctx := context.Background()

log.Info("Funding relayer address on all subnets")
relayerKey, err := crypto.GenerateKey()
Expect(err).Should(BeNil())
testUtils.FundRelayers(ctx, []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, fundedKey, relayerKey)

//
// Set up relayer config
//
relayerConfig := testUtils.CreateDefaultRelayerConfig(
[]interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo},
[]interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo},
teleporterContractAddress,
fundedAddress,
relayerKey,
)
// Enable the Warp API for all source blockchains
for _, subnet := range relayerConfig.SourceBlockchains {
subnet.WarpAPIEndpoint = subnet.RPCEndpoint
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any (easy) way to check that the API endpoint is actually used by this test? Not necessary right now, but if the configuration changed in the future this test may silently pass if it falls back to using the P2P requests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that's a good question that I don't immediately know the best answer to. We could add distinct metrics and monitor those from the test. Those metrics would be useful in a deployment as well. I'll look into this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added those metrics in and now check them in the test.

}

relayerConfigPath := testUtils.WriteRelayerConfig(relayerConfig, testUtils.DefaultRelayerCfgFname)

//
// Test Relaying from Subnet A to Subnet B
//
log.Info("Test Relaying from Subnet A to Subnet B")

log.Info("Starting the relayer")
relayerCleanup := testUtils.BuildAndRunRelayerExecutable(ctx, relayerConfigPath)
defer relayerCleanup()

// Sleep for some time to make sure relayer has started up and subscribed.
log.Info("Waiting for the relayer to start up")
time.Sleep(15 * time.Second)

log.Info("Sending transaction from Subnet A to Subnet B")
testUtils.RelayBasicMessage(
ctx,
subnetAInfo,
subnetBInfo,
teleporterContractAddress,
fundedKey,
fundedAddress,
)

//
// Test Relaying from Subnet B to Subnet A
//
log.Info("Test Relaying from Subnet B to Subnet A")
testUtils.RelayBasicMessage(
ctx,
subnetBInfo,
subnetAInfo,
teleporterContractAddress,
fundedKey,
fundedAddress,
)

log.Info("Finished sending warp message, closing down output channel")
// Cancel the command and stop the relayer
relayerCleanup()
}
15 changes: 12 additions & 3 deletions utils/client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@ import (

var ErrInvalidEndpoint = errors.New("invalid rpc endpoint")

// DialWithContext returns an ethclient.Client with the internal RPC client configured with the provided options.
func DialWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) {
// NewEthClientWithConfig returns an ethclient.Client with the internal RPC client configured with the provided options.
func NewEthClientWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) {
client, err := DialWithConfig(ctx, baseURL, httpHeaders, queryParams)
if err != nil {
return nil, err
}
return ethclient.NewClient(client), nil
}

// DialWithConfig dials the provided baseURL with the provided httpHeaders and queryParams
func DialWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParams map[string]string) (*rpc.Client, error) {
url, err := addQueryParams(baseURL, queryParams)
if err != nil {
return nil, err
Expand All @@ -25,7 +34,7 @@ func DialWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParam
if err != nil {
return nil, err
}
return ethclient.NewClient(client), nil
return client, nil
}

// addQueryParams adds the query parameters to the url
Expand Down
2 changes: 1 addition & 1 deletion vms/evm/destination_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewDestinationClient(
destinationBlockchain *config.DestinationBlockchain,
) (*destinationClient, error) {
// Dial the destination RPC endpoint
client, err := utils.DialWithConfig(
client, err := utils.NewEthClientWithConfig(
context.Background(),
destinationBlockchain.RPCEndpoint.BaseURL,
destinationBlockchain.RPCEndpoint.HTTPHeaders,
Expand Down