Skip to content

Commit

Permalink
rpc.BroadcastTxCommit
Browse files Browse the repository at this point in the history
  • Loading branch information
vasylNaumenko committed May 12, 2024
1 parent 0f265e6 commit 178fa13
Showing 1 changed file with 86 additions and 3 deletions.
89 changes: 86 additions & 3 deletions vm/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
mempl "github.com/cometbft/cometbft/mempool"
"github.com/cometbft/cometbft/p2p"
"github.com/cometbft/cometbft/proxy"
"github.com/cometbft/cometbft/rpc/core"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types"
"github.com/cometbft/cometbft/store"
Expand Down Expand Up @@ -63,9 +64,9 @@ func (rpc *RPC) Routes() map[string]*jsonrpc.RPCFunc {
"consensus_params": jsonrpc.NewRPCFunc(rpc.ConsensusParams, "height", jsonrpc.Cacheable("height")),

// tx broadcast API
// "broadcast_tx_commit": jsonrpc.NewRPCFunc(rpc.BroadcastTxCommit, "tx"),
"broadcast_tx_sync": jsonrpc.NewRPCFunc(rpc.BroadcastTxSync, "tx"),
"broadcast_tx_async": jsonrpc.NewRPCFunc(rpc.BroadcastTxAsync, "tx"),
"broadcast_tx_commit": jsonrpc.NewRPCFunc(rpc.BroadcastTxCommit, "tx"),
"broadcast_tx_sync": jsonrpc.NewRPCFunc(rpc.BroadcastTxSync, "tx"),
"broadcast_tx_async": jsonrpc.NewRPCFunc(rpc.BroadcastTxAsync, "tx"),

// abci API
"abci_query": jsonrpc.NewRPCFunc(rpc.ABCIQuery, "path,data,height,prove"),
Expand Down Expand Up @@ -137,6 +138,88 @@ func (rpc *RPC) ABCIQuery(
return &ctypes.ResultABCIQuery{Response: *resQuery}, nil
}

func (rpc *RPC) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
subscriber := ctx.RemoteAddr()

// Subscribe to tx being committed in block.
subCtx, cancel := context.WithTimeout(context.Background(), core.SubscribeTimeout)
defer cancel()

q := types.EventQueryTxFor(tx)
deliverTxSub, err := rpc.vm.eventBus.Subscribe(subCtx, subscriber, q)
if err != nil {
err = fmt.Errorf("failed to subscribe to tx: %w", err)
rpc.vm.logger.Error("Error on broadcast_tx_commit", "err", err)
return nil, err
}
defer func() {
if err := rpc.vm.eventBus.Unsubscribe(context.Background(), subscriber, q); err != nil {
rpc.vm.logger.Error("Error unsubscribing from eventBus", "err", err)
}
}()

// Broadcast tx and wait for CheckTx result
checkTxResCh := make(chan *abci.ResponseCheckTx, 1)
err = rpc.vm.mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) {
select {
case <-ctx.Context().Done():
case checkTxResCh <- res:
}
}, mempl.TxInfo{})
if err != nil {
rpc.vm.logger.Error("Error on broadcastTxCommit", "err", err)
return nil, fmt.Errorf("error on broadcastTxCommit: %v", err)
}

select {
case <-ctx.Context().Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err())
case checkTxRes := <-checkTxResCh:
if checkTxRes.Code != abci.CodeTypeOK {
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
TxResult: abci.ExecTxResult{},
Hash: tx.Hash(),
}, nil
}

// Wait for the tx to be included in a block or timeout.
select {
case msg := <-deliverTxSub.Out(): // The tx was included in a block.
eventDataTx := msg.Data().(types.EventDataTx)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
TxResult: eventDataTx.Result,
Hash: tx.Hash(),
Height: eventDataTx.Height,
}, nil
case <-deliverTxSub.Canceled():
var reason string
if deliverTxSub.Err() == nil {
reason = "CometBFT exited"
} else {
reason = deliverTxSub.Err().Error()
}
err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason)
rpc.vm.logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
TxResult: abci.ExecTxResult{},
Hash: tx.Hash(),
}, err
// TODO: use rpc.config.TimeoutBroadcastTxCommit for timeout
case <-time.After(10 * time.Second):
err = errors.New("timed out waiting for tx to be included in a block")
rpc.vm.logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
TxResult: abci.ExecTxResult{},
Hash: tx.Hash(),
}, err
}
}
}

func (rpc *RPC) BroadcastTxAsync(_ *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := rpc.vm.mempool.CheckTx(tx, nil, mempl.TxInfo{})
if err != nil {
Expand Down

0 comments on commit 178fa13

Please sign in to comment.