diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 0cb82aa9d..b4b338172 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -161,23 +161,24 @@ func TestMempoolRmBadTx(t *testing.T) { resEndRecheckTx := app.EndRecheckTx(abci.RequestEndRecheckTx{}) assert.Equal(t, code.CodeTypeOK, resEndRecheckTx.Code) - emptyMempoolCh := make(chan struct{}) + checkTxErrorCh := make(chan error) checkTxRespCh := make(chan struct{}) + emptyMempoolCh := make(chan struct{}) go func() { // Try to send the tx through the mempool. // CheckTx should not err, but the app should return a bad abci code // and the tx should get removed from the pool - err := assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response) { + assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(err error) { + checkTxErrorCh <- err + }, func(r *abci.Response) { if r.GetCheckTx().Code != code.CodeTypeBadNonce { t.Errorf("expected checktx to return bad nonce, got %v", r) return } checkTxRespCh <- struct{}{} }) - if err != nil { - t.Errorf("error after CheckTx: %v", err) - return - } + + <-checkTxErrorCh // check for the tx for { diff --git a/consensus/replay_stubs.go b/consensus/replay_stubs.go index cdeaf701f..b21e3cfad 100644 --- a/consensus/replay_stubs.go +++ b/consensus/replay_stubs.go @@ -21,8 +21,7 @@ func (emptyMempool) Size() int { return 0 } func (emptyMempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) { return nil, nil } -func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error { - return nil +func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ func(*abci.Response)) { } func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } diff --git a/mempool/bench_test.go b/mempool/bench_test.go index 2cfe94e3e..5242d6523 100644 --- a/mempool/bench_test.go +++ b/mempool/bench_test.go @@ -36,7 +36,7 @@ func BenchmarkReapWithCheckTxAsync(b *testing.B) { for i := 0; i < size; i++ { tx := make([]byte, 8) binary.BigEndian.PutUint64(tx, uint64(i)) - mempool.CheckTxAsync(tx, TxInfo{}, nil) // nolint: errcheck + mempool.CheckTxAsync(tx, TxInfo{}, nil, nil) } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -66,7 +66,7 @@ func BenchmarkCheckTxAsync(b *testing.B) { for i := 0; i < b.N; i++ { tx := make([]byte, 8) binary.BigEndian.PutUint64(tx, uint64(i)) - mempool.CheckTxAsync(tx, TxInfo{}, nil) // nolint: errcheck + mempool.CheckTxAsync(tx, TxInfo{}, nil, nil) } } diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index d1382310d..d5c9b3e88 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -26,7 +26,7 @@ const TxKeySize = sha256.Size var newline = []byte("\n") -//-------------------------------------------------------------------------------- +// -------------------------------------------------------------------------------- // CListMempool is an ordered in-memory pool for transactions before they are // proposed in a consensus round. Transaction validity is checked using the @@ -53,6 +53,8 @@ type CListMempool struct { updateMtx tmsync.RWMutex preCheck PreCheckFunc + chReqCheckTx chan *requestCheckTxAsync + wal *auto.AutoFile // a log of mempool txs txs *clist.CList // concurrent linked-list of good txs proxyAppConn proxy.AppConnMempool @@ -70,6 +72,13 @@ type CListMempool struct { metrics *Metrics } +type requestCheckTxAsync struct { + tx types.Tx + txInfo TxInfo + prepareCb func(error) + checkTxCb func(*abci.Response) +} + var _ Mempool = &CListMempool{} // CListMempoolOption sets an optional parameter on the mempool. @@ -87,6 +96,7 @@ func NewCListMempool( proxyAppConn: proxyAppConn, txs: clist.New(), height: height, + chReqCheckTx: make(chan *requestCheckTxAsync, config.Size), logger: log.NewNopLogger(), metrics: NopMetrics(), } @@ -99,6 +109,7 @@ func NewCListMempool( for _, option := range options { option(mempool) } + go mempool.checkTxAsyncReactor() return mempool } @@ -237,39 +248,51 @@ func (mem *CListMempool) CheckTxSync(tx types.Tx, txInfo TxInfo) (res *abci.Resp return res, err } -// It blocks if we're waiting on Update() or Reap(). // cb: A callback from the CheckTx command. // It gets called from another goroutine. -// CONTRACT: Either cb will get called, or err returned. // // Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response)) (err error) { +func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), + checkTxCb func(*abci.Response)) { + mem.chReqCheckTx <- &requestCheckTxAsync{tx: tx, txInfo: txInfo, prepareCb: prepareCb, checkTxCb: checkTxCb} +} + +func (mem *CListMempool) checkTxAsyncReactor() { + for req := range mem.chReqCheckTx { + mem.checkTxAsync(req.tx, req.txInfo, req.prepareCb, req.checkTxCb) + } +} + +// It blocks if we're waiting on Update() or Reap(). +func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), + checkTxCb func(*abci.Response)) { mem.updateMtx.RLock() - // use defer to unlock mutex because application (*local client*) might panic defer func() { - if err != nil { - mem.updateMtx.RUnlock() - return - } - if r := recover(); r != nil { mem.updateMtx.RUnlock() panic(r) } }() - if err = mem.prepareCheckTx(tx, txInfo); err != nil { - return err + err := mem.prepareCheckTx(tx, txInfo) + if prepareCb != nil { + prepareCb(err) + } + if err != nil { + mem.updateMtx.RUnlock() + return } // CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) reqRes.SetCallback(func(res *abci.Response) { - mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, cb) - mem.updateMtx.RUnlock() + mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, func(response *abci.Response) { + if checkTxCb != nil { + checkTxCb(response) + } + mem.updateMtx.RUnlock() + }) }) - - return err } // CONTRACT: `caller` should held `mem.updateMtx.RLock()` @@ -708,7 +731,7 @@ func (mem *CListMempool) recheckTxs() { wg.Wait() } -//-------------------------------------------------------------------------------- +// -------------------------------------------------------------------------------- // mempoolTx is a transaction that successfully ran type mempoolTx struct { @@ -726,7 +749,7 @@ func (memTx *mempoolTx) Height() int64 { return atomic.LoadInt64(&memTx.height) } -//-------------------------------------------------------------------------------- +// -------------------------------------------------------------------------------- type txCache interface { Reset() @@ -809,7 +832,7 @@ func (nopTxCache) Reset() {} func (nopTxCache) Push(types.Tx) bool { return true } func (nopTxCache) Remove(types.Tx) {} -//-------------------------------------------------------------------------------- +// -------------------------------------------------------------------------------- // TxKey is the fixed length array hash used as the key in maps. func TxKey(tx types.Tx) [TxKeySize]byte { diff --git a/mempool/mempool.go b/mempool/mempool.go index 01ed2508e..882e6893d 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -16,7 +16,7 @@ type Mempool interface { // CheckTx executes a new transaction against the application to determine // its validity and whether it should be added to the mempool. CheckTxSync(tx types.Tx, txInfo TxInfo) (*abci.Response, error) - CheckTxAsync(tx types.Tx, txInfo TxInfo, callback func(*abci.Response)) error + CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*abci.Response)) // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than diff --git a/mempool/mock/mempool.go b/mempool/mock/mempool.go index 1bed7c125..b0a69f85c 100644 --- a/mempool/mock/mempool.go +++ b/mempool/mock/mempool.go @@ -18,8 +18,7 @@ func (Mempool) Size() int { return 0 } func (Mempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) { return nil, nil } -func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error { - return nil +func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ func(*abci.Response)) { } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } diff --git a/mempool/reactor.go b/mempool/reactor.go index b6c06cb98..71240b395 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -183,10 +183,12 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { txInfo.SenderP2PID = src.ID() } for _, tx := range msg.Txs { - err = memR.mempool.CheckTxAsync(tx, txInfo, nil) - if err != nil { - memR.Logger.Info("Could not check tx", "tx", txID(tx), "err", err) - } + tx := tx // pin! workaround for `scopelint` error + memR.mempool.CheckTxAsync(tx, txInfo, func(err error) { + if err != nil { + memR.Logger.Info("Could not check tx", "tx", txID(tx), "err", err) + } + }, nil) } // broadcasting happens from go routines per peer } diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 8c021e674..9550571d9 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -351,8 +351,11 @@ func TestUnconfirmedTxs(t *testing.T) { ch := make(chan *abci.Response, 1) mempool := node.Mempool() - err := mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(resp *abci.Response) { ch <- resp }) - require.NoError(t, err) + mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(err error) { + require.NoError(t, err) + }, func(resp *abci.Response) { + ch <- resp + }) // wait for tx to arrive in mempoool. select { @@ -381,8 +384,11 @@ func TestNumUnconfirmedTxs(t *testing.T) { ch := make(chan *abci.Response, 1) mempool := node.Mempool() - err := mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(resp *abci.Response) { ch <- resp }) - require.NoError(t, err) + mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(err error) { + require.NoError(t, err) + }, func(resp *abci.Response) { + ch <- resp + }) // wait for tx to arrive in mempoool. select { diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 393fe055d..1c280ccd8 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -20,11 +20,15 @@ import ( // CheckTx nor DeliverTx results. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - err := env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, nil) - + chErr := make(chan error) + env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(err error) { + chErr <- err + }, nil) + err := <-chErr if err != nil { return nil, err } + return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil } diff --git a/test/maverick/consensus/replay_stubs.go b/test/maverick/consensus/replay_stubs.go index cdeaf701f..b21e3cfad 100644 --- a/test/maverick/consensus/replay_stubs.go +++ b/test/maverick/consensus/replay_stubs.go @@ -21,8 +21,7 @@ func (emptyMempool) Size() int { return 0 } func (emptyMempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) { return nil, nil } -func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error { - return nil +func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ func(*abci.Response)) { } func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }