Skip to content

Commit

Permalink
node: implement observation requests via gossip
Browse files Browse the repository at this point in the history
Limitations:

- Only supported for Solana and for confirmation level Finalized,
  which the token/NFT bridges use. Need to take a close look before
  enabling it for both (since we're bypassing the tx fetcher and would
  fetch and process accounts of the "wrong" confirmation levels).

- Rate limiting not implemented yet, will be done in a future release
  when things are not currently on fire.

Test: https://gist.github.com/leoluk/bab3a18e922057109facea1cf1f26b2f

commit-id:6a0d4c32
  • Loading branch information
Leo authored and leoluk committed Jan 26, 2022
1 parent 79f489b commit 73a5b72
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 16 deletions.
3 changes: 3 additions & 0 deletions DEVELOP.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ To Solana as CPI instruction:

kubectl exec solana-devnet-0 -c setup -- client post-message --proxy CP1co2QMMoDPbsmV7PGcUTLFwyhgCgTXt25gLQ5LewE1 Bridge1p5gheXUvJ6jGWGeCsgPKgnE3YgdGKRVCMY9o 1 confirmed ffff

### Observation Requests

kubectl exec -it guardian-0 -- /guardiand admin send-observation-request --socket /tmp/admin.sock 1 4636d8f7593c78a5092bed13dec765cc705752653db5eb1498168c92345cd389

### IntelliJ Protobuf Autocompletion

Expand Down
41 changes: 41 additions & 0 deletions node/cmd/guardiand/adminclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
"github.com/certusone/wormhole/node/pkg/vaa"
"github.com/davecgh/go-spew/spew"
Expand Down Expand Up @@ -43,12 +44,14 @@ func init() {
AdminClientFindMissingMessagesCmd.Flags().AddFlagSet(pf)
AdminClientListNodes.Flags().AddFlagSet(pf)
DumpVAAByMessageID.Flags().AddFlagSet(pf)
SendObservationRequest.Flags().AddFlagSet(pf)

AdminCmd.AddCommand(AdminClientInjectGuardianSetUpdateCmd)
AdminCmd.AddCommand(AdminClientFindMissingMessagesCmd)
AdminCmd.AddCommand(AdminClientGovernanceVAAVerifyCmd)
AdminCmd.AddCommand(AdminClientListNodes)
AdminCmd.AddCommand(DumpVAAByMessageID)
AdminCmd.AddCommand(SendObservationRequest)
}

var AdminCmd = &cobra.Command{
Expand Down Expand Up @@ -77,6 +80,13 @@ var DumpVAAByMessageID = &cobra.Command{
Args: cobra.ExactArgs(1),
}

var SendObservationRequest = &cobra.Command{
Use: "send-observation-request [CHAIN_ID] [TX_HASH_HEX]",
Short: "Broadcast an observation request for the given chain ID and chain-specific tx_hash",
Run: runSendObservationRequest,
Args: cobra.ExactArgs(2),
}

func getAdminClient(ctx context.Context, addr string) (*grpc.ClientConn, error, nodev1.NodePrivilegedServiceClient) {
conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix:///%s", addr), grpc.WithInsecure())

Expand Down Expand Up @@ -213,3 +223,34 @@ func runDumpVAAByMessageID(cmd *cobra.Command, args []string) {
log.Printf("VAA with digest %s: %+v\n", v.HexDigest(), spew.Sdump(v))
fmt.Printf("Bytes:\n%s\n", hex.EncodeToString(resp.VaaBytes))
}

func runSendObservationRequest(cmd *cobra.Command, args []string) {
chainID, err := strconv.Atoi(args[0])
if err != nil {
log.Fatalf("invalid chain ID: %v", err)
}

txHash, err := hex.DecodeString(args[1])
if err != nil {
log.Fatalf("invalid transaction hash: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

conn, err, c := getAdminClient(ctx, *clientSocketPath)
defer conn.Close()
if err != nil {
log.Fatalf("failed to get admin client: %v", err)
}

_, err = c.SendObservationRequest(ctx, &nodev1.SendObservationRequestRequest{
ObservationRequest: &gossipv1.ObservationRequest{
ChainId: uint32(chainID),
TxHash: txHash,
},
})
if err != nil {
log.Fatalf("failed to send observation request: %v", err)
}
}
26 changes: 17 additions & 9 deletions node/cmd/guardiand/adminserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (

type nodePrivilegedService struct {
nodev1.UnimplementedNodePrivilegedServiceServer
db *db.Database
injectC chan<- *vaa.VAA
logger *zap.Logger
signedInC chan *gossipv1.SignedVAAWithQuorum
db *db.Database
injectC chan<- *vaa.VAA
obsvReqSendC chan *gossipv1.ObservationRequest
logger *zap.Logger
signedInC chan *gossipv1.SignedVAAWithQuorum
}

// adminGuardianSetUpdateToVAA converts a nodev1.GuardianSetUpdate message to its canonical VAA representation.
Expand Down Expand Up @@ -342,7 +343,7 @@ func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *no
}, nil
}

func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, signedInC chan *gossipv1.SignedVAAWithQuorum, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, signedInC chan *gossipv1.SignedVAAWithQuorum, obsvReqSendC chan *gossipv1.ObservationRequest, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
// Delete existing UNIX socket, if present.
fi, err := os.Stat(socketPath)
if err == nil {
Expand Down Expand Up @@ -375,10 +376,11 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<-
logger.Info("admin server listening on", zap.String("path", socketPath))

nodeService := &nodePrivilegedService{
injectC: injectC,
db: db,
logger: logger.Named("adminservice"),
signedInC: signedInC,
injectC: injectC,
obsvReqSendC: obsvReqSendC,
db: db,
logger: logger.Named("adminservice"),
signedInC: signedInC,
}

publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst)
Expand All @@ -388,3 +390,9 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<-
publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, publicrpcService)
return supervisor.GRPCServer(grpcServer, l, false), nil
}

func (s *nodePrivilegedService) SendObservationRequest(ctx context.Context, req *nodev1.SendObservationRequestRequest) (*nodev1.SendObservationRequestResponse, error) {
s.obsvReqSendC <- req.ObservationRequest
s.logger.Info("sent observation request", zap.Any("request", req.ObservationRequest))
return &nodev1.SendObservationRequestResponse{}, nil
}
14 changes: 10 additions & 4 deletions node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,12 @@ func runNode(cmd *cobra.Command, args []string) {
// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)

// Outbound observation requests
obsvReqSendC := make(chan *gossipv1.ObservationRequest)

// Injected VAAs (manually generated rather than created via observation)
injectC := make(chan *vaa.VAA)

Expand Down Expand Up @@ -599,7 +605,7 @@ func runNode(cmd *cobra.Command, args []string) {
}

// local admin service socket
adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, signedInC, db, gst)
adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, signedInC, obsvReqSendC, db, gst)
if err != nil {
logger.Fatal("failed to create admin service socket", zap.Error(err))
}
Expand All @@ -613,7 +619,7 @@ func runNode(cmd *cobra.Command, args []string) {
// Run supervisor.
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p.Run(
obsvC, sendC, signedInC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil {
obsvC, obsvReqC, obsvReqSendC, sendC, signedInC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil {
return err
}

Expand Down Expand Up @@ -669,12 +675,12 @@ func runNode(cmd *cobra.Command, args []string) {
}

if err := supervisor.Run(ctx, "solwatch-confirmed",
solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, rpc.CommitmentConfirmed).Run); err != nil {
solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, nil, rpc.CommitmentConfirmed).Run); err != nil {
return err
}

if err := supervisor.Run(ctx, "solwatch-finalized",
solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, rpc.CommitmentFinalized).Run); err != nil {
solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, obsvReqC, rpc.CommitmentFinalized).Run); err != nil {
return err
}

Expand Down
3 changes: 1 addition & 2 deletions node/cmd/spy/spy.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,7 @@ func runSpy(cmd *cobra.Command, args []string) {

// Run supervisor.
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p.Run(
obsvC, sendC, signedInC, priv, nil, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, "", false, rootCtxCancel)); err != nil {
if err := supervisor.Run(ctx, "p2p", p2p.Run(obsvC, nil, nil, sendC, signedInC, priv, nil, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, "", false, rootCtxCancel)); err != nil {
return err
}

Expand Down
111 changes: 110 additions & 1 deletion node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,17 @@ var (

var heartbeatMessagePrefix = []byte("heartbeat|")

var signedObservationRequestPrefix = []byte("signed_observation_request|")

func heartbeatDigest(b []byte) common.Hash {
return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...))
}

func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, signedInC chan *gossipv1.SignedVAAWithQuorum, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *node_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error {
func signedObservationRequestDigest(b []byte) common.Hash {
return ethcrypto.Keccak256Hash(append(signedObservationRequestPrefix, b...))
}

func Run(obsvC chan *gossipv1.SignedObservation, obsvReqC chan *gossipv1.ObservationRequest, obsvReqSendC chan *gossipv1.ObservationRequest, sendC chan []byte, signedInC chan *gossipv1.SignedVAAWithQuorum, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *node_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error {
return func(ctx context.Context) (re error) {
logger := supervisor.Logger(ctx)

Expand Down Expand Up @@ -279,6 +285,44 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, signedInC ch
if err != nil {
logger.Error("failed to publish message from queue", zap.Error(err))
}
case msg := <-obsvReqSendC:
b, err := proto.Marshal(msg)
if err != nil {
panic(err)
}

// Sign the observation request using our node's guardian key.
digest := signedObservationRequestDigest(b)
sig, err := ethcrypto.Sign(digest.Bytes(), gk)
if err != nil {
panic(err)
}

sReq := &gossipv1.SignedObservationRequest{
ObservationRequest: b,
Signature: sig,
GuardianAddr: ethcrypto.PubkeyToAddress(gk.PublicKey).Bytes(),
}

envelope := &gossipv1.GossipMessage{
Message: &gossipv1.GossipMessage_SignedObservationRequest{
SignedObservationRequest: sReq}}

b, err = proto.Marshal(envelope)
if err != nil {
panic(err)
}

// Send to local observation request queue (the loopback message is ignored)
obsvReqC <- msg

err = th.Publish(ctx, b)
p2pMessagesSent.Inc()
if err != nil {
logger.Error("failed to publish observation request", zap.Error(err))
} else {
logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq))
}
}
}
}()
Expand Down Expand Up @@ -342,6 +386,32 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, signedInC ch
case *gossipv1.GossipMessage_SignedVaaWithQuorum:
signedInC <- m.SignedVaaWithQuorum
p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc()
case *gossipv1.GossipMessage_SignedObservationRequest:
s := m.SignedObservationRequest
gs := gst.Get()
if gs == nil {
logger.Debug("dropping SignedObservationRequest - no guardian set",
zap.Any("value", s),
zap.String("from", envelope.GetFrom().String()))
break
}
r, err := processSignedObservationRequest(s, gs)
if err != nil {
p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc()
logger.Debug("invalid signed observation request received",
zap.Error(err),
zap.Any("payload", msg.Message),
zap.Any("value", s),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
} else {
p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc()
logger.Info("valid signed observation request received",
zap.Any("value", r),
zap.String("from", envelope.GetFrom().String()))

obsvReqC <- r
}
default:
p2pMessagesReceived.WithLabelValues("unknown").Inc()
logger.Warn("received unknown message type (running outdated software?)",
Expand Down Expand Up @@ -392,3 +462,42 @@ func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *node_

return &h, nil
}

func processSignedObservationRequest(s *gossipv1.SignedObservationRequest, gs *node_common.GuardianSet) (*gossipv1.ObservationRequest, error) {
envelopeAddr := common.BytesToAddress(s.GuardianAddr)
idx, ok := gs.KeyIndex(envelopeAddr)
var pk common.Address
if !ok {
return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr)
} else {
pk = gs.Keys[idx]
}

digest := signedObservationRequestDigest(s.ObservationRequest)

pubKey, err := ethcrypto.Ecrecover(digest.Bytes(), s.Signature)
if err != nil {
return nil, errors.New("failed to recover public key")
}

signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:])
if pk != signerAddr {
return nil, fmt.Errorf("invalid signer: %v", signerAddr)
}

var h gossipv1.ObservationRequest
err = proto.Unmarshal(s.ObservationRequest, &h)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal observation request: %w", err)
}

// For now, this supports Solana only. Once we add more chains, we'll have to add a
// multiplexer/router in node.go.
if h.ChainId != uint32(vaa.ChainIDSolana) {
return nil, fmt.Errorf("unsupported chain id: %d", h.ChainId)
}

// TODO: implement per-guardian rate limiting

return &h, nil
}
14 changes: 14 additions & 0 deletions node/pkg/solana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type SolanaWatcher struct {
rpcUrl string
commitment rpc.CommitmentType
messageEvent chan *common.MessagePublication
obsvReqC chan *gossipv1.ObservationRequest
rpcClient *rpc.Client
}

Expand Down Expand Up @@ -101,11 +102,13 @@ func NewSolanaWatcher(
wsUrl, rpcUrl string,
contractAddress solana.PublicKey,
messageEvents chan *common.MessagePublication,
obsvReqC chan *gossipv1.ObservationRequest,
commitment rpc.CommitmentType) *SolanaWatcher {
return &SolanaWatcher{
contract: contractAddress,
wsUrl: wsUrl, rpcUrl: rpcUrl,
messageEvent: messageEvents,
obsvReqC: obsvReqC,
commitment: commitment,
rpcClient: rpc.New(rpcUrl),
}
Expand Down Expand Up @@ -155,6 +158,17 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
for _, acc := range accs {
s.fetchMessageAccount(rCtx, logger, solana.MustPublicKeyFromBase58(acc), 0)
}
case m := <-s.obsvReqC:
if m.ChainId != uint32(vaa.ChainIDSolana) {
panic("unexpected chain id")
}

acc := solana.PublicKeyFromBytes(m.TxHash)
logger.Info("received observation request", zap.String("account", acc.String()))

rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
s.fetchMessageAccount(rCtx, logger, acc, 0)
cancel()
case <-timer.C:
// Get current slot height
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
Expand Down
Loading

0 comments on commit 73a5b72

Please sign in to comment.