diff --git a/abci/client/local_client.go b/abci/client/local_client.go index 27883ee1f..c30427e37 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -15,6 +15,8 @@ var _ Client = (*localClient)(nil) type localClient struct { service.BaseService + // TODO: remove `mtx` to increase concurrency. + // CONTRACT: The application should protect itself from concurrency as an abci server. mtx *tmsync.Mutex types.Application Callback @@ -92,9 +94,6 @@ func (app *localClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes { } func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes { - // CONTRACT: Application should handle concurrent `CheckTx` - // In this abci client layer, we don't protect `CheckTx` with a mutex for concurrency - res := app.Application.CheckTx(req) return app.callback( types.ToRequestCheckTx(req), @@ -103,9 +102,6 @@ func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes { } func (app *localClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx) *ReqRes { - app.mtx.Lock() - defer app.mtx.Unlock() - res := app.Application.BeginRecheckTx(req) return app.callback( types.ToRequestBeginRecheckTx(req), @@ -114,9 +110,6 @@ func (app *localClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx) *Re } func (app *localClient) EndRecheckTxAsync(req types.RequestEndRecheckTx) *ReqRes { - app.mtx.Lock() - defer app.mtx.Unlock() - res := app.Application.EndRecheckTx(req) return app.callback( types.ToRequestEndRecheckTx(req), @@ -258,9 +251,6 @@ func (app *localClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respon } func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) { - // CONTRACT: Application should handle concurrent `CheckTx` - // In this abci client layer, we don't protect `CheckTx` with a mutex for concurrency - res := app.Application.CheckTx(req) return &res, nil } diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index bd85ff675..5d6b49c70 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -20,7 +20,6 @@ import ( "github.com/line/ostracon/p2p" "github.com/line/ostracon/proxy" "github.com/line/ostracon/types" - "github.com/pkg/errors" ) // TxKeySize is the size of the transaction key index @@ -591,7 +590,7 @@ func (mem *CListMempool) Update( block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx, preCheck PreCheckFunc, -) error { +) (err error) { // Set height mem.height = block.Height mem.notifiedTxsAvailable = false @@ -624,54 +623,47 @@ func (mem *CListMempool) Update( } } - // Either recheck non-committed txs to see if they became invalid - // or just notify there're some txs left. - recheckStartTime := time.Now().UnixNano() - if mem.Size() > 0 { - if mem.config.Recheck { - mem.logger.Info("recheck txs", "numtxs", mem.Size(), "height", block.Height) - res, err := mem.proxyAppConn.BeginRecheckTxSync(abci.RequestBeginRecheckTx{ - Header: types.OC2PB.Header(&block.Header), - }) - if res.Code == abci.CodeTypeOK && err == nil { - mem.recheckTxs() - res2, err2 := mem.proxyAppConn.EndRecheckTxSync(abci.RequestEndRecheckTx{Height: block.Height}) - if res2.Code != abci.CodeTypeOK { - return errors.New("the function EndRecheckTxSync does not respond CodeTypeOK") - } - if err2 != nil { - return errors.Wrap(err2, "the function EndRecheckTxSync returns an error") - } - } else { - if res.Code != abci.CodeTypeOK { - return errors.New("the function BeginRecheckTxSync does not respond CodeTypeOK") - } - if err != nil { - return errors.Wrap(err, "the function BeginRecheckTxSync returns an error") - } - } + if mem.config.Recheck { + // recheck non-committed txs to see if they became invalid + recheckStartTime := time.Now().UnixNano() - // At this point, mem.txs are being rechecked. - // mem.recheckCursor re-scans mem.txs and possibly removes some txs. - // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. - } else { - mem.notifyTxsAvailable() + _, err = mem.proxyAppConn.BeginRecheckTxSync(abci.RequestBeginRecheckTx{ + Header: types.OC2PB.Header(&block.Header), + }) + if err != nil { + mem.logger.Error("error in proxyAppConn.BeginRecheckTxSync", "err", err) } - } - recheckEndTime := time.Now().UnixNano() - recheckTimeMs := float64(recheckEndTime-recheckStartTime) / 1000000 - mem.metrics.RecheckTime.Set(recheckTimeMs) + mem.logger.Info("recheck txs", "numtxs", mem.Size(), "height", block.Height) + mem.recheckTxs() + + _, err = mem.proxyAppConn.EndRecheckTxSync(abci.RequestEndRecheckTx{Height: block.Height}) + if err != nil { + mem.logger.Error("error in proxyAppConn.EndRecheckTxSync", "err", err) + } + + recheckEndTime := time.Now().UnixNano() + + recheckTimeMs := float64(recheckEndTime-recheckStartTime) / 1000000 + mem.metrics.RecheckTime.Set(recheckTimeMs) + + // At this point, mem.txs are being rechecked. + // mem.recheckCursor re-scans mem.txs and possibly removes some txs. + // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. + } else if mem.Size() > 0 { + // just notify there're some txs left. + mem.notifyTxsAvailable() + } // Update metrics mem.metrics.Size.Set(float64(mem.Size())) - return nil + return err } func (mem *CListMempool) recheckTxs() { if mem.Size() == 0 { - panic("recheckTxs is called, but the mempool is empty") + return } mem.recheckCursor = mem.txs.Front()