diff --git a/vm/rpc.go b/vm/rpc.go index d68ce72..4ea18cd 100644 --- a/vm/rpc.go +++ b/vm/rpc.go @@ -14,6 +14,7 @@ import ( mempl "github.com/cometbft/cometbft/mempool" "github.com/cometbft/cometbft/p2p" "github.com/cometbft/cometbft/proxy" + "github.com/cometbft/cometbft/rpc/core" ctypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" "github.com/cometbft/cometbft/store" @@ -63,9 +64,9 @@ func (rpc *RPC) Routes() map[string]*jsonrpc.RPCFunc { "consensus_params": jsonrpc.NewRPCFunc(rpc.ConsensusParams, "height", jsonrpc.Cacheable("height")), // tx broadcast API - // "broadcast_tx_commit": jsonrpc.NewRPCFunc(rpc.BroadcastTxCommit, "tx"), - "broadcast_tx_sync": jsonrpc.NewRPCFunc(rpc.BroadcastTxSync, "tx"), - "broadcast_tx_async": jsonrpc.NewRPCFunc(rpc.BroadcastTxAsync, "tx"), + "broadcast_tx_commit": jsonrpc.NewRPCFunc(rpc.BroadcastTxCommit, "tx"), + "broadcast_tx_sync": jsonrpc.NewRPCFunc(rpc.BroadcastTxSync, "tx"), + "broadcast_tx_async": jsonrpc.NewRPCFunc(rpc.BroadcastTxAsync, "tx"), // abci API "abci_query": jsonrpc.NewRPCFunc(rpc.ABCIQuery, "path,data,height,prove"), @@ -137,6 +138,88 @@ func (rpc *RPC) ABCIQuery( return &ctypes.ResultABCIQuery{Response: *resQuery}, nil } +func (rpc *RPC) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { + subscriber := ctx.RemoteAddr() + + // Subscribe to tx being committed in block. + subCtx, cancel := context.WithTimeout(context.Background(), core.SubscribeTimeout) + defer cancel() + + q := types.EventQueryTxFor(tx) + deliverTxSub, err := rpc.vm.eventBus.Subscribe(subCtx, subscriber, q) + if err != nil { + err = fmt.Errorf("failed to subscribe to tx: %w", err) + rpc.vm.logger.Error("Error on broadcast_tx_commit", "err", err) + return nil, err + } + defer func() { + if err := rpc.vm.eventBus.Unsubscribe(context.Background(), subscriber, q); err != nil { + rpc.vm.logger.Error("Error unsubscribing from eventBus", "err", err) + } + }() + + // Broadcast tx and wait for CheckTx result + checkTxResCh := make(chan *abci.ResponseCheckTx, 1) + err = rpc.vm.mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) { + select { + case <-ctx.Context().Done(): + case checkTxResCh <- res: + } + }, mempl.TxInfo{}) + if err != nil { + rpc.vm.logger.Error("Error on broadcastTxCommit", "err", err) + return nil, fmt.Errorf("error on broadcastTxCommit: %v", err) + } + + select { + case <-ctx.Context().Done(): + return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err()) + case checkTxRes := <-checkTxResCh: + if checkTxRes.Code != abci.CodeTypeOK { + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + TxResult: abci.ExecTxResult{}, + Hash: tx.Hash(), + }, nil + } + + // Wait for the tx to be included in a block or timeout. + select { + case msg := <-deliverTxSub.Out(): // The tx was included in a block. + eventDataTx := msg.Data().(types.EventDataTx) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + TxResult: eventDataTx.Result, + Hash: tx.Hash(), + Height: eventDataTx.Height, + }, nil + case <-deliverTxSub.Canceled(): + var reason string + if deliverTxSub.Err() == nil { + reason = "CometBFT exited" + } else { + reason = deliverTxSub.Err().Error() + } + err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) + rpc.vm.logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + TxResult: abci.ExecTxResult{}, + Hash: tx.Hash(), + }, err + // TODO: use rpc.config.TimeoutBroadcastTxCommit for timeout + case <-time.After(10 * time.Second): + err = errors.New("timed out waiting for tx to be included in a block") + rpc.vm.logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + TxResult: abci.ExecTxResult{}, + Hash: tx.Hash(), + }, err + } + } +} + func (rpc *RPC) BroadcastTxAsync(_ *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { err := rpc.vm.mempool.CheckTx(tx, nil, mempl.TxInfo{}) if err != nil {