Skip to content

Commit

Permalink
feat: concurrent checkTx #213; fix tm-db call
Browse files Browse the repository at this point in the history
  • Loading branch information
egonspace committed Jul 8, 2021
1 parent 04c5f31 commit 877e670
Show file tree
Hide file tree
Showing 37 changed files with 1,771 additions and 329 deletions.
5 changes: 5 additions & 0 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
tmsync "github.com/line/ostracon/libs/sync"
)

//go:generate mockery --case underscore --name Client
const (
dialRetryIntervalSeconds = 3
echoRetryIntervalSeconds = 1
Expand Down Expand Up @@ -36,6 +37,8 @@ type Client interface {
InitChainAsync(types.RequestInitChain) *ReqRes
BeginBlockAsync(types.RequestBeginBlock) *ReqRes
EndBlockAsync(types.RequestEndBlock) *ReqRes
BeginRecheckTxAsync(types.RequestBeginRecheckTx) *ReqRes
EndRecheckTxAsync(types.RequestEndRecheckTx) *ReqRes
ListSnapshotsAsync(types.RequestListSnapshots) *ReqRes
OfferSnapshotAsync(types.RequestOfferSnapshot) *ReqRes
LoadSnapshotChunkAsync(types.RequestLoadSnapshotChunk) *ReqRes
Expand All @@ -52,6 +55,8 @@ type Client interface {
InitChainSync(types.RequestInitChain) (*types.ResponseInitChain, error)
BeginBlockSync(types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
EndBlockSync(types.RequestEndBlock) (*types.ResponseEndBlock, error)
BeginRecheckTxSync(types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error)
EndRecheckTxSync(types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error)
ListSnapshotsSync(types.RequestListSnapshots) (*types.ResponseListSnapshots, error)
OfferSnapshotSync(types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
LoadSnapshotChunkSync(types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
Expand Down
28 changes: 28 additions & 0 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,24 @@ func (cli *grpcClient) EndBlockAsync(params types.RequestEndBlock) *ReqRes {
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndBlock{EndBlock: res}})
}

func (cli *grpcClient) BeginRecheckTxAsync(params types.RequestBeginRecheckTx) *ReqRes {
req := types.ToRequestBeginRecheckTx(params)
res, err := cli.client.BeginRecheckTx(context.Background(), req.GetBeginRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginRecheckTx{BeginRecheckTx: res}})
}

func (cli *grpcClient) EndRecheckTxAsync(params types.RequestEndRecheckTx) *ReqRes {
req := types.ToRequestEndRecheckTx(params)
res, err := cli.client.EndRecheckTx(context.Background(), req.GetEndRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndRecheckTx{EndRecheckTx: res}})
}

func (cli *grpcClient) ListSnapshotsAsync(params types.RequestListSnapshots) *ReqRes {
req := types.ToRequestListSnapshots(params)
res, err := cli.client.ListSnapshots(context.Background(), req.GetListSnapshots(), grpc.WaitForReady(true))
Expand Down Expand Up @@ -396,6 +414,16 @@ func (cli *grpcClient) EndBlockSync(params types.RequestEndBlock) (*types.Respon
return cli.finishSyncCall(reqres).GetEndBlock(), cli.Error()
}

func (cli *grpcClient) BeginRecheckTxSync(params types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
reqres := cli.BeginRecheckTxAsync(params)
return reqres.Response.GetBeginRecheckTx(), cli.Error()
}

func (cli *grpcClient) EndRecheckTxSync(params types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
reqres := cli.EndRecheckTxAsync(params)
return reqres.Response.GetEndRecheckTx(), cli.Error()
}

func (cli *grpcClient) ListSnapshotsSync(params types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
reqres := cli.ListSnapshotsAsync(params)
return cli.finishSyncCall(reqres).GetListSnapshots(), cli.Error()
Expand Down
46 changes: 42 additions & 4 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (app *localClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes {
}

func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()
// CONTRACT: Application should handle concurrent `CheckTx`
// In this abci client layer, we don't protect `CheckTx` with a mutex for concurrency

res := app.Application.CheckTx(req)
return app.callback(
Expand All @@ -102,6 +102,28 @@ func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
)
}

func (app *localClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.BeginRecheckTx(req)
return app.callback(
types.ToRequestBeginRecheckTx(req),
types.ToResponseBeginRecheckTx(res),
)
}

func (app *localClient) EndRecheckTxAsync(req types.RequestEndRecheckTx) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.EndRecheckTx(req)
return app.callback(
types.ToRequestEndRecheckTx(req),
types.ToResponseEndRecheckTx(res),
)
}

func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()
Expand Down Expand Up @@ -236,8 +258,8 @@ func (app *localClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respon
}

func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
// CONTRACT: Application should handle concurrent `CheckTx`
// In this abci client layer, we don't protect `CheckTx` with a mutex for concurrency

res := app.Application.CheckTx(req)
return &res, nil
Expand Down Expand Up @@ -283,6 +305,22 @@ func (app *localClient) EndBlockSync(req types.RequestEndBlock) (*types.Response
return &res, nil
}

func (app *localClient) BeginRecheckTxSync(req types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.BeginRecheckTx(req)
return &res, nil
}

func (app *localClient) EndRecheckTxSync(req types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.EndRecheckTx(req)
return &res, nil
}

func (app *localClient) ListSnapshotsSync(req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
Expand Down
80 changes: 79 additions & 1 deletion abci/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,14 @@ func (cli *socketClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes {
return cli.queueRequest(types.ToRequestEndBlock(req))
}

func (cli *socketClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx) *ReqRes {
return cli.queueRequest(types.ToRequestBeginRecheckTx(req))
}

func (cli *socketClient) EndRecheckTxAsync(req types.RequestEndRecheckTx) *ReqRes {
return cli.queueRequest(types.ToRequestEndRecheckTx(req))
}

func (cli *socketClient) ListSnapshotsAsync(req types.RequestListSnapshots) *ReqRes {
return cli.queueRequest(types.ToRequestListSnapshots(req))
}
Expand Down Expand Up @@ -380,6 +388,24 @@ func (cli *socketClient) EndBlockSync(req types.RequestEndBlock) (*types.Respons
return reqres.Response.GetEndBlock(), cli.Error()
}

func (cli *socketClient) BeginRecheckTxSync(req types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
reqres := cli.queueRequest(types.ToRequestBeginRecheckTx(req))
if err := cli.FlushSync(); err != nil {
return nil, err
}

return reqres.Response.GetBeginRecheckTx(), cli.Error()
}

func (cli *socketClient) EndRecheckTxSync(req types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
reqres := cli.queueRequest(types.ToRequestEndRecheckTx(req))
if err := cli.FlushSync(); err != nil {
return nil, err
}

return reqres.Response.GetEndRecheckTx(), cli.Error()
}

func (cli *socketClient) ListSnapshotsSync(req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
reqres := cli.queueRequest(types.ToRequestListSnapshots(req))
if err := cli.FlushSync(); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ func (app *PersistentKVStoreApplication) CheckTx(req types.RequestCheckTx) types
return app.app.CheckTx(req)
}

func (app *PersistentKVStoreApplication) BeginRecheckTx(req types.RequestBeginRecheckTx) types.ResponseBeginRecheckTx {
return app.app.BeginRecheckTx(req)
}

func (app *PersistentKVStoreApplication) EndRecheckTx(req types.RequestEndRecheckTx) types.ResponseEndRecheckTx {
return app.app.EndRecheckTx(req)
}

// Commit will panic if InitChain was not called
func (app *PersistentKVStoreApplication) Commit() types.ResponseCommit {
return app.app.Commit()
Expand Down
23 changes: 22 additions & 1 deletion abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ type Application interface {
Query(RequestQuery) ResponseQuery // Query for state

// Mempool Connection
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
BeginRecheckTx(RequestBeginRecheckTx) ResponseBeginRecheckTx // Signals the beginning of rechecking
EndRecheckTx(RequestEndRecheckTx) ResponseEndRecheckTx // Signals the end of rechecking

// Consensus Connection
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain w validators/other info from TendermintCore
Expand Down Expand Up @@ -79,6 +81,14 @@ func (BaseApplication) EndBlock(req RequestEndBlock) ResponseEndBlock {
return ResponseEndBlock{}
}

func (BaseApplication) BeginRecheckTx(req RequestBeginRecheckTx) ResponseBeginRecheckTx {
return ResponseBeginRecheckTx{Code: CodeTypeOK}
}

func (BaseApplication) EndRecheckTx(req RequestEndRecheckTx) ResponseEndRecheckTx {
return ResponseEndRecheckTx{Code: CodeTypeOK}
}

func (BaseApplication) ListSnapshots(req RequestListSnapshots) ResponseListSnapshots {
return ResponseListSnapshots{}
}
Expand Down Expand Up @@ -159,6 +169,17 @@ func (app *GRPCApplication) EndBlock(ctx context.Context, req *RequestEndBlock)
return &res, nil
}

func (app *GRPCApplication) BeginRecheckTx(ctx context.Context, req *RequestBeginRecheckTx) (
*ResponseBeginRecheckTx, error) {
res := app.app.BeginRecheckTx(*req)
return &res, nil
}

func (app *GRPCApplication) EndRecheckTx(ctx context.Context, req *RequestEndRecheckTx) (*ResponseEndRecheckTx, error) {
res := app.app.EndRecheckTx(*req)
return &res, nil
}

func (app *GRPCApplication) ListSnapshots(
ctx context.Context, req *RequestListSnapshots) (*ResponseListSnapshots, error) {
res := app.app.ListSnapshots(*req)
Expand Down
24 changes: 24 additions & 0 deletions abci/types/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ func ToRequestEndBlock(req RequestEndBlock) *Request {
}
}

func ToRequestBeginRecheckTx(req RequestBeginRecheckTx) *Request {
return &Request{
Value: &Request_BeginRecheckTx{&req},
}
}

func ToRequestEndRecheckTx(req RequestEndRecheckTx) *Request {
return &Request{
Value: &Request_EndRecheckTx{&req},
}
}

func ToRequestListSnapshots(req RequestListSnapshots) *Request {
return &Request{
Value: &Request_ListSnapshots{&req},
Expand Down Expand Up @@ -233,6 +245,18 @@ func ToResponseEndBlock(res ResponseEndBlock) *Response {
}
}

func ToResponseBeginRecheckTx(res ResponseBeginRecheckTx) *Response {
return &Response{
Value: &Response_BeginRecheckTx{&res},
}
}

func ToResponseEndRecheckTx(res ResponseEndRecheckTx) *Response {
return &Response{
Value: &Response_EndRecheckTx{&res},
}
}

func ToResponseListSnapshots(res ResponseListSnapshots) *Response {
return &Response{
Value: &Response_ListSnapshots{&res},
Expand Down
Loading

0 comments on commit 877e670

Please sign in to comment.