From a170ee9339fe41e1aeebbfc1662c51a9ed6d0bfb Mon Sep 17 00:00:00 2001 From: Vasyl Naumenko Date: Sun, 7 Jul 2024 09:04:53 +0300 Subject: [PATCH] gRPC endpoint for CosmWasm app (#43) * grpc for wasm app * wip rpc client * rpc client * cleanup * network name * block signature * logs * logs * fix error `codespace sdk code 2: tx parse error: unable to resolve type URL /cosmwasm.wasm.v1.MsgStoreCode`. interfaceRegistry in ClientContext should be taken from Wasm Application --------- Co-authored-by: ramil --- example/wasm/main.go | 167 ++++++++++++++++++++++++++++++++++++++++++- vm/rpc.go | 17 ++++- vm/types/config.go | 48 +++++++++++++ vm/vm.go | 26 +++++-- 4 files changed, 250 insertions(+), 8 deletions(-) create mode 100644 vm/types/config.go diff --git a/example/wasm/main.go b/example/wasm/main.go index 8066ca4..581bcd7 100644 --- a/example/wasm/main.go +++ b/example/wasm/main.go @@ -2,21 +2,36 @@ package main import ( "context" + "encoding/json" "fmt" "os" + "os/signal" + "syscall" "cosmossdk.io/log" "github.com/CosmWasm/wasmd/app" "github.com/CosmWasm/wasmd/x/wasm/keeper" wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types" + rpchttp "github.com/cometbft/cometbft/rpc/client/http" dbm "github.com/cosmos/cosmos-db" "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/codec" + cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" "github.com/cosmos/cosmos-sdk/server" + srvconfig "github.com/cosmos/cosmos-sdk/server/config" + servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" "github.com/cosmos/cosmos-sdk/testutil/sims" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/x/auth/tx" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/consideritdone/landslidevm" + "github.com/consideritdone/landslidevm/utils/ids" "github.com/consideritdone/landslidevm/vm" + vmtypes "github.com/consideritdone/landslidevm/vm/types" ) func main() { @@ -40,8 +55,158 @@ func WasmCreator() vm.AppCreator { cfg.SetBech32PrefixForConsensusNode(app.Bech32PrefixConsAddr, app.Bech32PrefixConsPub) cfg.SetAddressVerifier(wasmtypes.VerifyAddressLen()) cfg.Seal() - wasmApp := app.NewWasmApp(logger, db, nil, true, sims.NewAppOptionsWithFlagHome(os.TempDir()), []keeper.Option{}, baseapp.SetChainID("landslide-test")) + + srvCfg := *srvconfig.DefaultConfig() + grpcCfg := srvCfg.GRPC + var vmCfg vmtypes.VmConfig + vmCfg.SetDefaults() + if len(config.ConfigBytes) > 0 { + if err := json.Unmarshal(config.ConfigBytes, &vmCfg); err != nil { + return nil, fmt.Errorf("failed to unmarshal config %s: %w", string(config.ConfigBytes), err) + } + // set the grpc port, if it is set to 0, disable gRPC + if vmCfg.GRPCPort > 0 { + grpcCfg.Address = fmt.Sprintf("127.0.0.1:%d", vmCfg.GRPCPort) + } else { + grpcCfg.Enable = false + } + } + + if err := vmCfg.Validate(); err != nil { + return nil, err + } + chainID := vmCfg.NetworkName + + var wasmApp = app.NewWasmApp( + logger, + db, + nil, + true, + sims.NewAppOptionsWithFlagHome(os.TempDir()), + []keeper.Option{}, + baseapp.SetChainID(chainID), + ) + + // early return if gRPC is disabled + if !grpcCfg.Enable { + return server.NewCometABCIWrapper(wasmApp), nil + } + + interfaceRegistry := wasmApp.InterfaceRegistry() + marshaller := codec.NewProtoCodec(interfaceRegistry) + clientCtx := client.Context{}. + WithCodec(marshaller). + WithLegacyAmino(makeCodec()). + WithTxConfig(tx.NewTxConfig(marshaller, tx.DefaultSignModes)). + WithInterfaceRegistry(interfaceRegistry). + WithChainID(chainID) + + avaChainID, err := ids.ToID(config.ChainId) + if err != nil { + return nil, err + } + + rpcURI := fmt.Sprintf( + "http://127.0.0.1:%d/ext/bc/%s/rpc", + vmCfg.RPCPort, + avaChainID, + ) + + clientCtx = clientCtx.WithNodeURI(rpcURI) + rpcclient, err := rpchttp.New(rpcURI, "/websocket") + if err != nil { + return nil, err + } + clientCtx = clientCtx.WithClient(rpcclient) + + // use the provided clientCtx to register the services + wasmApp.RegisterTxService(clientCtx) + wasmApp.RegisterTendermintService(clientCtx) + wasmApp.RegisterNodeService(clientCtx, srvconfig.Config{}) + + maxSendMsgSize := grpcCfg.MaxSendMsgSize + if maxSendMsgSize == 0 { + maxSendMsgSize = srvconfig.DefaultGRPCMaxSendMsgSize + } + + maxRecvMsgSize := grpcCfg.MaxRecvMsgSize + if maxRecvMsgSize == 0 { + maxRecvMsgSize = srvconfig.DefaultGRPCMaxRecvMsgSize + } + + // if gRPC is enabled, configure gRPC client for gRPC gateway + grpcClient, err := grpc.Dial( + grpcCfg.Address, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), + grpc.MaxCallRecvMsgSize(maxRecvMsgSize), + grpc.MaxCallSendMsgSize(maxSendMsgSize), + ), + ) + if err != nil { + return nil, err + } + + clientCtx = clientCtx.WithGRPCClient(grpcClient) + logger.Debug("gRPC client assigned to client context", "target", grpcCfg.Address) + + g, ctx := getCtx(logger, false) + + grpcSrv, err := servergrpc.NewGRPCServer(clientCtx, wasmApp, grpcCfg) + if err != nil { + return nil, err + } + + // Start the gRPC server in a goroutine. Note, the provided ctx will ensure + // that the server is gracefully shut down. + g.Go(func() error { + return servergrpc.StartGRPCServer(ctx, logger.With("module", "grpc-server"), grpcCfg, grpcSrv) + }) return server.NewCometABCIWrapper(wasmApp), nil } } + +// custom tx codec +func makeCodec() *codec.LegacyAmino { + cdc := codec.NewLegacyAmino() + sdk.RegisterLegacyAminoCodec(cdc) + cryptocodec.RegisterCrypto(cdc) + return cdc +} + +func getCtx(logger log.Logger, block bool) (*errgroup.Group, context.Context) { + ctx, cancelFn := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + // listen for quit signals so the calling parent process can gracefully exit + listenForQuitSignals(g, block, cancelFn, logger) + return g, ctx +} + +// listenForQuitSignals listens for SIGINT and SIGTERM. When a signal is received, +// the cleanup function is called, indicating the caller can gracefully exit or +// return. +// +// Note, the blocking behavior of this depends on the block argument. +// The caller must ensure the corresponding context derived from the cancelFn is used correctly. +func listenForQuitSignals(g *errgroup.Group, block bool, cancelFn context.CancelFunc, logger log.Logger) { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + f := func() { + sig := <-sigCh + cancelFn() + + logger.Info("caught signal", "signal", sig.String()) + } + + if block { + g.Go(func() error { + f() + return nil + }) + } else { + go f() + } +} diff --git a/vm/rpc.go b/vm/rpc.go index 8f7d07b..ed0d91a 100644 --- a/vm/rpc.go +++ b/vm/rpc.go @@ -139,6 +139,7 @@ func (rpc *RPC) ABCIQuery( } func (rpc *RPC) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { + rpc.vm.logger.Info("BroadcastTxCommit called") subscriber := ctx.RemoteAddr() // Subscribe to tx being committed in block. @@ -208,7 +209,7 @@ func (rpc *RPC) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.R Hash: tx.Hash(), }, err // TODO: use rpc.config.TimeoutBroadcastTxCommit for timeout - case <-time.After(10 * time.Second): + case <-time.After(30 * time.Second): err = errors.New("timed out waiting for tx to be included in a block") rpc.vm.logger.Error("Error on broadcastTxCommit", "err", err) return &ctypes.ResultBroadcastTxCommit{ @@ -221,22 +222,28 @@ func (rpc *RPC) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.R } func (rpc *RPC) BroadcastTxAsync(_ *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + rpc.vm.logger.Info("BroadcastTxAsync called") err := rpc.vm.mempool.CheckTx(tx, nil, mempl.TxInfo{}) if err != nil { + rpc.vm.logger.Error("Error on broadcastTxAsync", "err", err) return nil, err } return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil } func (rpc *RPC) BroadcastTxSync(_ *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + rpc.vm.logger.Info("BroadcastTxSync called") resCh := make(chan *abci.ResponseCheckTx, 1) err := rpc.vm.mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) { resCh <- res }, mempl.TxInfo{}) if err != nil { + rpc.vm.logger.Error("Error on BroadcastTxSync", "err", err) return nil, err } res := <-resCh + + rpc.vm.logger.Info("BroadcastTxSync response", "Code", res.Code, "Log", res.Log, "Codespace", res.Codespace, "Hash", tx.Hash()) return &ctypes.ResultBroadcastTx{ Code: res.GetCode(), Data: res.GetData(), @@ -397,8 +404,11 @@ func (rpc *RPC) Block(_ *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBloc blockMeta := rpc.vm.blockStore.LoadBlockMeta(height) if blockMeta == nil { + rpc.vm.logger.Info("Block not found", "height", height) return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil } + + rpc.vm.logger.Info("Block response", "height", height, "block", block, "blockMeta", blockMeta) return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil } @@ -539,12 +549,15 @@ func (rpc *RPC) Validators( } func (rpc *RPC) Tx(_ *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { + rpc.vm.logger.Info("Tx called", "hash", hash) r, err := rpc.vm.txIndexer.Get(hash) if err != nil { + rpc.vm.logger.Error("Error on Tx", "err", err) return nil, err } if r == nil { + rpc.vm.logger.Error("Error on Tx", "tx not found", hash) return nil, fmt.Errorf("tx (%X) not found", hash) } @@ -736,7 +749,7 @@ func (rpc *RPC) Status(_ *rpctypes.Context) (*ctypes.ResultStatus, error) { ), DefaultNodeID: p2p.ID(rpc.vm.appOpts.NodeId), ListenAddr: "", - Network: fmt.Sprintf("%d", rpc.vm.appOpts.NetworkId), + Network: rpc.vm.networkName, Version: version.TMCoreSemVer, Channels: nil, Moniker: "", diff --git a/vm/types/config.go b/vm/types/config.go new file mode 100644 index 0000000..c126ac5 --- /dev/null +++ b/vm/types/config.go @@ -0,0 +1,48 @@ +package types + +import ( + "fmt" + "time" +) + +const ( + defaultRPCPort = 9752 + defaultGRPCPort = 9090 + defaultMaxOpenConnections = 0 // unlimited + defaultTimeoutBroadcastTxCommit time.Duration = 30 * time.Second +) + +// VmConfig ... +type VmConfig struct { + RPCPort uint16 `json:"rpc_port"` + GRPCPort uint16 `json:"grpc_port"` + GRPCMaxOpenConnections int `json:"grpc_max_open_connections"` + TimeoutBroadcastTxCommit time.Duration `json:"broadcast_commit_timeout"` + NetworkName string `json:"network_name"` +} + +// SetDefaults sets the default values for the config. +func (c *VmConfig) SetDefaults() { + c.RPCPort = defaultRPCPort + c.GRPCPort = defaultGRPCPort + c.GRPCMaxOpenConnections = defaultMaxOpenConnections + c.TimeoutBroadcastTxCommit = defaultTimeoutBroadcastTxCommit + c.NetworkName = "landslide-test" +} + +// Validate returns an error if this is an invalid config. +func (c *VmConfig) Validate() error { + if c.GRPCMaxOpenConnections < 0 { + return fmt.Errorf("grpc_max_open_connections can't be negative") + } + + if c.TimeoutBroadcastTxCommit < 0 { + return fmt.Errorf("broadcast_tx_commit_timeout can't be negative") + } + + if len(c.NetworkName) == 0 { + return fmt.Errorf("network_name can't be empty") + } + + return nil +} diff --git a/vm/vm.go b/vm/vm.go index d56041b..20cfebc 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -16,6 +16,7 @@ import ( abcitypes "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/config" "github.com/cometbft/cometbft/consensus" + "github.com/cometbft/cometbft/crypto" "github.com/cometbft/cometbft/crypto/secp256k1" "github.com/cometbft/cometbft/libs/log" "github.com/cometbft/cometbft/mempool" @@ -95,6 +96,7 @@ type ( AppCreator func(*AppCreatorOpts) (Application, error) LandslideVM struct { + networkName string allowShutdown *vmtypes.Atomic[bool] processMetrics prometheus.Gatherer @@ -256,7 +258,7 @@ func (vm *LandslideVM) Initialize(_ context.Context, req *vmpb.InitializeRequest vm.appOpts = &AppCreatorOpts{ NetworkId: req.NetworkId, SubnetId: req.SubnetId, - ChainId: req.CChainId, + ChainId: req.ChainId, NodeId: req.NodeId, PublicKey: req.PublicKey, XChainId: req.XChainId, @@ -272,6 +274,19 @@ func (vm *LandslideVM) Initialize(_ context.Context, req *vmpb.InitializeRequest return nil, err } + // Set the default configuration + var vmCfg vmtypes.VmConfig + vmCfg.SetDefaults() + if len(vm.appOpts.ConfigBytes) > 0 { + if err := json.Unmarshal(vm.appOpts.ConfigBytes, &vmCfg); err != nil { + return nil, fmt.Errorf("failed to unmarshal config %s: %w", string(vm.appOpts.ConfigBytes), err) + } + } + if err := vmCfg.Validate(); err != nil { + return nil, err + } + vm.networkName = vmCfg.NetworkName + vm.state, vm.genesis, err = node.LoadStateFromDBOrGenesisDocProvider( dbStateStore, func() (*types.GenesisDoc, error) { @@ -390,7 +405,7 @@ func (vm *LandslideVM) Initialize(_ context.Context, req *vmpb.InitializeRequest if err != nil { return nil, err } - vm.logger.Debug("initialize block", "bytes ", blockBytes) + //vm.logger.Debug("initialize block", "bytes ", blockBytes) vm.logger.Info("vm initialization completed") parentHash := block.BlockParentHash(blk) @@ -519,7 +534,7 @@ func (vm *LandslideVM) BuildBlock(context.Context, *vmpb.BuildBlockRequest) (*vm BlockIDFlag: types.BlockIDFlagNil, Timestamp: time.Now(), ValidatorAddress: vm.state.Validators.Validators[i].Address, - Signature: []byte{0x0}, + Signature: crypto.CRandBytes(types.MaxSignatureSize), // todo: sign the block }, } } @@ -567,7 +582,8 @@ func (vm *LandslideVM) BuildBlock(context.Context, *vmpb.BuildBlockRequest) (*vm // ParseBlock attempt to create a block from a stream of bytes. func (vm *LandslideVM) ParseBlock(_ context.Context, req *vmpb.ParseBlockRequest) (*vmpb.ParseBlockResponse, error) { - vm.logger.Debug("ParseBlock", "bytes", req.Bytes) + vm.logger.Info("ParseBlock") + //vm.logger.Debug("ParseBlock", "bytes", req.Bytes) var ( blk *types.Block blkStatus vmpb.Status @@ -823,7 +839,7 @@ func (vm *LandslideVM) GetStateSummary(context.Context, *vmpb.GetStateSummaryReq func (vm *LandslideVM) BlockVerify(_ context.Context, req *vmpb.BlockVerifyRequest) (*vmpb.BlockVerifyResponse, error) { vm.logger.Info("BlockVerify") - vm.logger.Debug("block verify", "bytes", req.Bytes) + //vm.logger.Debug("block verify", "bytes", req.Bytes) blk, blkStatus, err := vmstate.DecodeBlockWithStatus(req.Bytes) if err != nil {