Skip to content

Commit

Permalink
fix: revise to call Begin/EndRecheck even though mem.Size() is 0 (#219)
Browse files Browse the repository at this point in the history
* fix: revise to call Begin/EndRecheck even though `mem.Size()` is 0

* chore: revise local_client.go

* fix: lint error

* chore: recheckTxs() just return if mem.Size() == 0
  • Loading branch information
egonspace committed Jul 8, 2021
1 parent 800cab3 commit 05becb8
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 51 deletions.
14 changes: 2 additions & 12 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ var _ Client = (*localClient)(nil)
type localClient struct {
service.BaseService

// TODO: remove `mtx` to increase concurrency.
// CONTRACT: The application should protect itself from concurrency as an abci server.
mtx *tmsync.Mutex
types.Application
Callback
Expand Down Expand Up @@ -92,9 +94,6 @@ func (app *localClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes {
}

func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
// CONTRACT: Application should handle concurrent `CheckTx`
// In this abci client layer, we don't protect `CheckTx` with a mutex for concurrency

res := app.Application.CheckTx(req)
return app.callback(
types.ToRequestCheckTx(req),
Expand All @@ -103,9 +102,6 @@ func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
}

func (app *localClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.BeginRecheckTx(req)
return app.callback(
types.ToRequestBeginRecheckTx(req),
Expand All @@ -114,9 +110,6 @@ func (app *localClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx) *Re
}

func (app *localClient) EndRecheckTxAsync(req types.RequestEndRecheckTx) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.EndRecheckTx(req)
return app.callback(
types.ToRequestEndRecheckTx(req),
Expand Down Expand Up @@ -258,9 +251,6 @@ func (app *localClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respon
}

func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
// CONTRACT: Application should handle concurrent `CheckTx`
// In this abci client layer, we don't protect `CheckTx` with a mutex for concurrency

res := app.Application.CheckTx(req)
return &res, nil
}
Expand Down
70 changes: 31 additions & 39 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/line/ostracon/p2p"
"github.com/line/ostracon/proxy"
"github.com/line/ostracon/types"
"github.com/pkg/errors"
)

// TxKeySize is the size of the transaction key index
Expand Down Expand Up @@ -591,7 +590,7 @@ func (mem *CListMempool) Update(
block *types.Block,
deliverTxResponses []*abci.ResponseDeliverTx,
preCheck PreCheckFunc,
) error {
) (err error) {
// Set height
mem.height = block.Height
mem.notifiedTxsAvailable = false
Expand Down Expand Up @@ -624,54 +623,47 @@ func (mem *CListMempool) Update(
}
}

// Either recheck non-committed txs to see if they became invalid
// or just notify there're some txs left.
recheckStartTime := time.Now().UnixNano()
if mem.Size() > 0 {
if mem.config.Recheck {
mem.logger.Info("recheck txs", "numtxs", mem.Size(), "height", block.Height)
res, err := mem.proxyAppConn.BeginRecheckTxSync(abci.RequestBeginRecheckTx{
Header: types.OC2PB.Header(&block.Header),
})
if res.Code == abci.CodeTypeOK && err == nil {
mem.recheckTxs()
res2, err2 := mem.proxyAppConn.EndRecheckTxSync(abci.RequestEndRecheckTx{Height: block.Height})
if res2.Code != abci.CodeTypeOK {
return errors.New("the function EndRecheckTxSync does not respond CodeTypeOK")
}
if err2 != nil {
return errors.Wrap(err2, "the function EndRecheckTxSync returns an error")
}
} else {
if res.Code != abci.CodeTypeOK {
return errors.New("the function BeginRecheckTxSync does not respond CodeTypeOK")
}
if err != nil {
return errors.Wrap(err, "the function BeginRecheckTxSync returns an error")
}
}
if mem.config.Recheck {
// recheck non-committed txs to see if they became invalid
recheckStartTime := time.Now().UnixNano()

// At this point, mem.txs are being rechecked.
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
} else {
mem.notifyTxsAvailable()
_, err = mem.proxyAppConn.BeginRecheckTxSync(abci.RequestBeginRecheckTx{
Header: types.OC2PB.Header(&block.Header),
})
if err != nil {
mem.logger.Error("error in proxyAppConn.BeginRecheckTxSync", "err", err)
}
}
recheckEndTime := time.Now().UnixNano()

recheckTimeMs := float64(recheckEndTime-recheckStartTime) / 1000000
mem.metrics.RecheckTime.Set(recheckTimeMs)
mem.logger.Info("recheck txs", "numtxs", mem.Size(), "height", block.Height)
mem.recheckTxs()

_, err = mem.proxyAppConn.EndRecheckTxSync(abci.RequestEndRecheckTx{Height: block.Height})
if err != nil {
mem.logger.Error("error in proxyAppConn.EndRecheckTxSync", "err", err)
}

recheckEndTime := time.Now().UnixNano()

recheckTimeMs := float64(recheckEndTime-recheckStartTime) / 1000000
mem.metrics.RecheckTime.Set(recheckTimeMs)

// At this point, mem.txs are being rechecked.
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
} else if mem.Size() > 0 {
// just notify there're some txs left.
mem.notifyTxsAvailable()
}

// Update metrics
mem.metrics.Size.Set(float64(mem.Size()))

return nil
return err
}

func (mem *CListMempool) recheckTxs() {
if mem.Size() == 0 {
panic("recheckTxs is called, but the mempool is empty")
return
}

mem.recheckCursor = mem.txs.Front()
Expand Down

0 comments on commit 05becb8

Please sign in to comment.