Skip to content

Commit

Permalink
feat: impl checkTxAsyncReactor() (Finschia#168) (Finschia#225)
Browse files Browse the repository at this point in the history
* feat: impl checkTxAsyncReactor() (Finschia#168)

* fix: tests

* fix: lint errors
  • Loading branch information
egonspace committed Jul 8, 2021
1 parent 7da95d6 commit 4417941
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 44 deletions.
13 changes: 7 additions & 6 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,23 +161,24 @@ func TestMempoolRmBadTx(t *testing.T) {
resEndRecheckTx := app.EndRecheckTx(abci.RequestEndRecheckTx{})
assert.Equal(t, code.CodeTypeOK, resEndRecheckTx.Code)

emptyMempoolCh := make(chan struct{})
checkTxErrorCh := make(chan error)
checkTxRespCh := make(chan struct{})
emptyMempoolCh := make(chan struct{})
go func() {
// Try to send the tx through the mempool.
// CheckTx should not err, but the app should return a bad abci code
// and the tx should get removed from the pool
err := assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response) {
assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(err error) {
checkTxErrorCh <- err
}, func(r *abci.Response) {
if r.GetCheckTx().Code != code.CodeTypeBadNonce {
t.Errorf("expected checktx to return bad nonce, got %v", r)
return
}
checkTxRespCh <- struct{}{}
})
if err != nil {
t.Errorf("error after CheckTx: %v", err)
return
}

<-checkTxErrorCh

// check for the tx
for {
Expand Down
3 changes: 1 addition & 2 deletions consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ func (emptyMempool) Size() int { return 0 }
func (emptyMempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) {
return nil, nil
}
func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error {
return nil
func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ func(*abci.Response)) {
}
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
Expand Down
4 changes: 2 additions & 2 deletions mempool/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func BenchmarkReapWithCheckTxAsync(b *testing.B) {
for i := 0; i < size; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTxAsync(tx, TxInfo{}, nil) // nolint: errcheck
mempool.CheckTxAsync(tx, TxInfo{}, nil, nil)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -66,7 +66,7 @@ func BenchmarkCheckTxAsync(b *testing.B) {
for i := 0; i < b.N; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTxAsync(tx, TxInfo{}, nil) // nolint: errcheck
mempool.CheckTxAsync(tx, TxInfo{}, nil, nil)
}
}

Expand Down
61 changes: 42 additions & 19 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const TxKeySize = sha256.Size

var newline = []byte("\n")

//--------------------------------------------------------------------------------
// --------------------------------------------------------------------------------

// CListMempool is an ordered in-memory pool for transactions before they are
// proposed in a consensus round. Transaction validity is checked using the
Expand All @@ -53,6 +53,8 @@ type CListMempool struct {
updateMtx tmsync.RWMutex
preCheck PreCheckFunc

chReqCheckTx chan *requestCheckTxAsync

wal *auto.AutoFile // a log of mempool txs
txs *clist.CList // concurrent linked-list of good txs
proxyAppConn proxy.AppConnMempool
Expand All @@ -70,6 +72,13 @@ type CListMempool struct {
metrics *Metrics
}

type requestCheckTxAsync struct {
tx types.Tx
txInfo TxInfo
prepareCb func(error)
checkTxCb func(*abci.Response)
}

var _ Mempool = &CListMempool{}

// CListMempoolOption sets an optional parameter on the mempool.
Expand All @@ -87,6 +96,7 @@ func NewCListMempool(
proxyAppConn: proxyAppConn,
txs: clist.New(),
height: height,
chReqCheckTx: make(chan *requestCheckTxAsync, config.Size),
logger: log.NewNopLogger(),
metrics: NopMetrics(),
}
Expand All @@ -99,6 +109,7 @@ func NewCListMempool(
for _, option := range options {
option(mempool)
}
go mempool.checkTxAsyncReactor()
return mempool
}

Expand Down Expand Up @@ -237,39 +248,51 @@ func (mem *CListMempool) CheckTxSync(tx types.Tx, txInfo TxInfo) (res *abci.Resp
return res, err
}

// It blocks if we're waiting on Update() or Reap().
// cb: A callback from the CheckTx command.
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
//
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response)) (err error) {
func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error),
checkTxCb func(*abci.Response)) {
mem.chReqCheckTx <- &requestCheckTxAsync{tx: tx, txInfo: txInfo, prepareCb: prepareCb, checkTxCb: checkTxCb}
}

func (mem *CListMempool) checkTxAsyncReactor() {
for req := range mem.chReqCheckTx {
mem.checkTxAsync(req.tx, req.txInfo, req.prepareCb, req.checkTxCb)
}
}

// It blocks if we're waiting on Update() or Reap().
func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error),
checkTxCb func(*abci.Response)) {
mem.updateMtx.RLock()
// use defer to unlock mutex because application (*local client*) might panic
defer func() {
if err != nil {
mem.updateMtx.RUnlock()
return
}

if r := recover(); r != nil {
mem.updateMtx.RUnlock()
panic(r)
}
}()

if err = mem.prepareCheckTx(tx, txInfo); err != nil {
return err
err := mem.prepareCheckTx(tx, txInfo)
if prepareCb != nil {
prepareCb(err)
}
if err != nil {
mem.updateMtx.RUnlock()
return
}

// CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas)
reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
reqRes.SetCallback(func(res *abci.Response) {
mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, cb)
mem.updateMtx.RUnlock()
mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, func(response *abci.Response) {
if checkTxCb != nil {
checkTxCb(response)
}
mem.updateMtx.RUnlock()
})
})

return err
}

// CONTRACT: `caller` should held `mem.updateMtx.RLock()`
Expand Down Expand Up @@ -708,7 +731,7 @@ func (mem *CListMempool) recheckTxs() {
wg.Wait()
}

//--------------------------------------------------------------------------------
// --------------------------------------------------------------------------------

// mempoolTx is a transaction that successfully ran
type mempoolTx struct {
Expand All @@ -726,7 +749,7 @@ func (memTx *mempoolTx) Height() int64 {
return atomic.LoadInt64(&memTx.height)
}

//--------------------------------------------------------------------------------
// --------------------------------------------------------------------------------

type txCache interface {
Reset()
Expand Down Expand Up @@ -809,7 +832,7 @@ func (nopTxCache) Reset() {}
func (nopTxCache) Push(types.Tx) bool { return true }
func (nopTxCache) Remove(types.Tx) {}

//--------------------------------------------------------------------------------
// --------------------------------------------------------------------------------

// TxKey is the fixed length array hash used as the key in maps.
func TxKey(tx types.Tx) [TxKeySize]byte {
Expand Down
2 changes: 1 addition & 1 deletion mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Mempool interface {
// CheckTx executes a new transaction against the application to determine
// its validity and whether it should be added to the mempool.
CheckTxSync(tx types.Tx, txInfo TxInfo) (*abci.Response, error)
CheckTxAsync(tx types.Tx, txInfo TxInfo, callback func(*abci.Response)) error
CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*abci.Response))

// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes
// bytes total with the condition that the total gasWanted must be less than
Expand Down
3 changes: 1 addition & 2 deletions mempool/mock/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ func (Mempool) Size() int { return 0 }
func (Mempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) {
return nil, nil
}
func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error {
return nil
func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ func(*abci.Response)) {
}
func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
Expand Down
10 changes: 6 additions & 4 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,12 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
txInfo.SenderP2PID = src.ID()
}
for _, tx := range msg.Txs {
err = memR.mempool.CheckTxAsync(tx, txInfo, nil)
if err != nil {
memR.Logger.Info("Could not check tx", "tx", txID(tx), "err", err)
}
tx := tx // pin! workaround for `scopelint` error
memR.mempool.CheckTxAsync(tx, txInfo, func(err error) {
if err != nil {
memR.Logger.Info("Could not check tx", "tx", txID(tx), "err", err)
}
}, nil)
}
// broadcasting happens from go routines per peer
}
Expand Down
14 changes: 10 additions & 4 deletions rpc/client/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,11 @@ func TestUnconfirmedTxs(t *testing.T) {

ch := make(chan *abci.Response, 1)
mempool := node.Mempool()
err := mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(resp *abci.Response) { ch <- resp })
require.NoError(t, err)
mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(err error) {
require.NoError(t, err)
}, func(resp *abci.Response) {
ch <- resp
})

// wait for tx to arrive in mempoool.
select {
Expand Down Expand Up @@ -381,8 +384,11 @@ func TestNumUnconfirmedTxs(t *testing.T) {

ch := make(chan *abci.Response, 1)
mempool := node.Mempool()
err := mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(resp *abci.Response) { ch <- resp })
require.NoError(t, err)
mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(err error) {
require.NoError(t, err)
}, func(resp *abci.Response) {
ch <- resp
})

// wait for tx to arrive in mempoool.
select {
Expand Down
8 changes: 6 additions & 2 deletions rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import (
// CheckTx nor DeliverTx results.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async
func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, nil)

chErr := make(chan error)
env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(err error) {
chErr <- err
}, nil)
err := <-chErr
if err != nil {
return nil, err
}

return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil
}

Expand Down
3 changes: 1 addition & 2 deletions test/maverick/consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ func (emptyMempool) Size() int { return 0 }
func (emptyMempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) {
return nil, nil
}
func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error {
return nil
func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ func(*abci.Response)) {
}
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
Expand Down

0 comments on commit 4417941

Please sign in to comment.