Skip to content

Commit

Permalink
gRPC endpoint for CosmWasm app (#43)
Browse files Browse the repository at this point in the history
* grpc for wasm app

* wip rpc client

* rpc client

* cleanup

* network name

* block signature

* logs

* logs

* fix error `codespace sdk code 2: tx parse error: unable to resolve type URL /cosmwasm.wasm.v1.MsgStoreCode`. interfaceRegistry in ClientContext should be taken from Wasm Application

---------

Co-authored-by: ramil <ramilexe@gmail.com>
  • Loading branch information
vasylNaumenko and ramilexe committed Jul 23, 2024
1 parent 1cf70ef commit a170ee9
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 8 deletions.
167 changes: 166 additions & 1 deletion example/wasm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,36 @@ package main

import (
"context"
"encoding/json"
"fmt"
"os"
"os/signal"
"syscall"

"cosmossdk.io/log"
"github.com/CosmWasm/wasmd/app"
"github.com/CosmWasm/wasmd/x/wasm/keeper"
wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/codec"
cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec"
"github.com/cosmos/cosmos-sdk/server"
srvconfig "github.com/cosmos/cosmos-sdk/server/config"
servergrpc "github.com/cosmos/cosmos-sdk/server/grpc"
"github.com/cosmos/cosmos-sdk/testutil/sims"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth/tx"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/consideritdone/landslidevm"
"github.com/consideritdone/landslidevm/utils/ids"
"github.com/consideritdone/landslidevm/vm"
vmtypes "github.com/consideritdone/landslidevm/vm/types"
)

func main() {
Expand All @@ -40,8 +55,158 @@ func WasmCreator() vm.AppCreator {
cfg.SetBech32PrefixForConsensusNode(app.Bech32PrefixConsAddr, app.Bech32PrefixConsPub)
cfg.SetAddressVerifier(wasmtypes.VerifyAddressLen())
cfg.Seal()
wasmApp := app.NewWasmApp(logger, db, nil, true, sims.NewAppOptionsWithFlagHome(os.TempDir()), []keeper.Option{}, baseapp.SetChainID("landslide-test"))

srvCfg := *srvconfig.DefaultConfig()
grpcCfg := srvCfg.GRPC
var vmCfg vmtypes.VmConfig
vmCfg.SetDefaults()
if len(config.ConfigBytes) > 0 {
if err := json.Unmarshal(config.ConfigBytes, &vmCfg); err != nil {
return nil, fmt.Errorf("failed to unmarshal config %s: %w", string(config.ConfigBytes), err)
}
// set the grpc port, if it is set to 0, disable gRPC
if vmCfg.GRPCPort > 0 {
grpcCfg.Address = fmt.Sprintf("127.0.0.1:%d", vmCfg.GRPCPort)
} else {
grpcCfg.Enable = false
}
}

if err := vmCfg.Validate(); err != nil {
return nil, err
}
chainID := vmCfg.NetworkName

var wasmApp = app.NewWasmApp(
logger,
db,
nil,
true,
sims.NewAppOptionsWithFlagHome(os.TempDir()),
[]keeper.Option{},
baseapp.SetChainID(chainID),
)

// early return if gRPC is disabled
if !grpcCfg.Enable {
return server.NewCometABCIWrapper(wasmApp), nil
}

interfaceRegistry := wasmApp.InterfaceRegistry()
marshaller := codec.NewProtoCodec(interfaceRegistry)
clientCtx := client.Context{}.
WithCodec(marshaller).
WithLegacyAmino(makeCodec()).
WithTxConfig(tx.NewTxConfig(marshaller, tx.DefaultSignModes)).
WithInterfaceRegistry(interfaceRegistry).
WithChainID(chainID)

avaChainID, err := ids.ToID(config.ChainId)
if err != nil {
return nil, err
}

rpcURI := fmt.Sprintf(
"http://127.0.0.1:%d/ext/bc/%s/rpc",
vmCfg.RPCPort,
avaChainID,
)

clientCtx = clientCtx.WithNodeURI(rpcURI)
rpcclient, err := rpchttp.New(rpcURI, "/websocket")
if err != nil {
return nil, err
}
clientCtx = clientCtx.WithClient(rpcclient)

// use the provided clientCtx to register the services
wasmApp.RegisterTxService(clientCtx)
wasmApp.RegisterTendermintService(clientCtx)
wasmApp.RegisterNodeService(clientCtx, srvconfig.Config{})

maxSendMsgSize := grpcCfg.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = srvconfig.DefaultGRPCMaxSendMsgSize
}

maxRecvMsgSize := grpcCfg.MaxRecvMsgSize
if maxRecvMsgSize == 0 {
maxRecvMsgSize = srvconfig.DefaultGRPCMaxRecvMsgSize
}

// if gRPC is enabled, configure gRPC client for gRPC gateway
grpcClient, err := grpc.Dial(
grpcCfg.Address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
)
if err != nil {
return nil, err
}

clientCtx = clientCtx.WithGRPCClient(grpcClient)
logger.Debug("gRPC client assigned to client context", "target", grpcCfg.Address)

g, ctx := getCtx(logger, false)

grpcSrv, err := servergrpc.NewGRPCServer(clientCtx, wasmApp, grpcCfg)
if err != nil {
return nil, err
}

// Start the gRPC server in a goroutine. Note, the provided ctx will ensure
// that the server is gracefully shut down.
g.Go(func() error {
return servergrpc.StartGRPCServer(ctx, logger.With("module", "grpc-server"), grpcCfg, grpcSrv)
})

return server.NewCometABCIWrapper(wasmApp), nil
}
}

// custom tx codec
func makeCodec() *codec.LegacyAmino {
cdc := codec.NewLegacyAmino()
sdk.RegisterLegacyAminoCodec(cdc)
cryptocodec.RegisterCrypto(cdc)
return cdc
}

func getCtx(logger log.Logger, block bool) (*errgroup.Group, context.Context) {
ctx, cancelFn := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
// listen for quit signals so the calling parent process can gracefully exit
listenForQuitSignals(g, block, cancelFn, logger)
return g, ctx
}

// listenForQuitSignals listens for SIGINT and SIGTERM. When a signal is received,
// the cleanup function is called, indicating the caller can gracefully exit or
// return.
//
// Note, the blocking behavior of this depends on the block argument.
// The caller must ensure the corresponding context derived from the cancelFn is used correctly.
func listenForQuitSignals(g *errgroup.Group, block bool, cancelFn context.CancelFunc, logger log.Logger) {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

f := func() {
sig := <-sigCh
cancelFn()

logger.Info("caught signal", "signal", sig.String())
}

if block {
g.Go(func() error {
f()
return nil
})
} else {
go f()
}
}
17 changes: 15 additions & 2 deletions vm/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (rpc *RPC) ABCIQuery(
}

func (rpc *RPC) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
rpc.vm.logger.Info("BroadcastTxCommit called")
subscriber := ctx.RemoteAddr()

// Subscribe to tx being committed in block.
Expand Down Expand Up @@ -208,7 +209,7 @@ func (rpc *RPC) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.R
Hash: tx.Hash(),
}, err
// TODO: use rpc.config.TimeoutBroadcastTxCommit for timeout
case <-time.After(10 * time.Second):
case <-time.After(30 * time.Second):
err = errors.New("timed out waiting for tx to be included in a block")
rpc.vm.logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
Expand All @@ -221,22 +222,28 @@ func (rpc *RPC) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.R
}

func (rpc *RPC) BroadcastTxAsync(_ *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
rpc.vm.logger.Info("BroadcastTxAsync called")
err := rpc.vm.mempool.CheckTx(tx, nil, mempl.TxInfo{})
if err != nil {
rpc.vm.logger.Error("Error on broadcastTxAsync", "err", err)
return nil, err
}
return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil
}

func (rpc *RPC) BroadcastTxSync(_ *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
rpc.vm.logger.Info("BroadcastTxSync called")
resCh := make(chan *abci.ResponseCheckTx, 1)
err := rpc.vm.mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) {
resCh <- res
}, mempl.TxInfo{})
if err != nil {
rpc.vm.logger.Error("Error on BroadcastTxSync", "err", err)
return nil, err
}
res := <-resCh

rpc.vm.logger.Info("BroadcastTxSync response", "Code", res.Code, "Log", res.Log, "Codespace", res.Codespace, "Hash", tx.Hash())
return &ctypes.ResultBroadcastTx{
Code: res.GetCode(),
Data: res.GetData(),
Expand Down Expand Up @@ -397,8 +404,11 @@ func (rpc *RPC) Block(_ *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBloc
blockMeta := rpc.vm.blockStore.LoadBlockMeta(height)

if blockMeta == nil {
rpc.vm.logger.Info("Block not found", "height", height)
return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil
}

rpc.vm.logger.Info("Block response", "height", height, "block", block, "blockMeta", blockMeta)
return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil
}

Expand Down Expand Up @@ -539,12 +549,15 @@ func (rpc *RPC) Validators(
}

func (rpc *RPC) Tx(_ *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) {
rpc.vm.logger.Info("Tx called", "hash", hash)
r, err := rpc.vm.txIndexer.Get(hash)
if err != nil {
rpc.vm.logger.Error("Error on Tx", "err", err)
return nil, err
}

if r == nil {
rpc.vm.logger.Error("Error on Tx", "tx not found", hash)
return nil, fmt.Errorf("tx (%X) not found", hash)
}

Expand Down Expand Up @@ -736,7 +749,7 @@ func (rpc *RPC) Status(_ *rpctypes.Context) (*ctypes.ResultStatus, error) {
),
DefaultNodeID: p2p.ID(rpc.vm.appOpts.NodeId),
ListenAddr: "",
Network: fmt.Sprintf("%d", rpc.vm.appOpts.NetworkId),
Network: rpc.vm.networkName,
Version: version.TMCoreSemVer,
Channels: nil,
Moniker: "",
Expand Down
48 changes: 48 additions & 0 deletions vm/types/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package types

import (
"fmt"
"time"
)

const (
defaultRPCPort = 9752
defaultGRPCPort = 9090
defaultMaxOpenConnections = 0 // unlimited
defaultTimeoutBroadcastTxCommit time.Duration = 30 * time.Second
)

// VmConfig ...
type VmConfig struct {
RPCPort uint16 `json:"rpc_port"`
GRPCPort uint16 `json:"grpc_port"`
GRPCMaxOpenConnections int `json:"grpc_max_open_connections"`
TimeoutBroadcastTxCommit time.Duration `json:"broadcast_commit_timeout"`
NetworkName string `json:"network_name"`
}

// SetDefaults sets the default values for the config.
func (c *VmConfig) SetDefaults() {
c.RPCPort = defaultRPCPort
c.GRPCPort = defaultGRPCPort
c.GRPCMaxOpenConnections = defaultMaxOpenConnections
c.TimeoutBroadcastTxCommit = defaultTimeoutBroadcastTxCommit
c.NetworkName = "landslide-test"
}

// Validate returns an error if this is an invalid config.
func (c *VmConfig) Validate() error {
if c.GRPCMaxOpenConnections < 0 {
return fmt.Errorf("grpc_max_open_connections can't be negative")
}

if c.TimeoutBroadcastTxCommit < 0 {
return fmt.Errorf("broadcast_tx_commit_timeout can't be negative")
}

if len(c.NetworkName) == 0 {
return fmt.Errorf("network_name can't be empty")
}

return nil
}
Loading

0 comments on commit a170ee9

Please sign in to comment.