diff --git a/abci/client/client.go b/abci/client/client.go index 48ac778cc..8ccfc5cb6 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -23,29 +23,30 @@ const ( type Client interface { service.Service - SetResponseCallback(Callback) + SetGlobalCallback(GlobalCallback) + GetGlobalCallback() GlobalCallback Error() error - FlushAsync() *ReqRes - EchoAsync(msg string) *ReqRes - InfoAsync(types.RequestInfo) *ReqRes - SetOptionAsync(types.RequestSetOption) *ReqRes - DeliverTxAsync(types.RequestDeliverTx) *ReqRes - CheckTxAsync(types.RequestCheckTx) *ReqRes - QueryAsync(types.RequestQuery) *ReqRes - CommitAsync() *ReqRes - InitChainAsync(types.RequestInitChain) *ReqRes - BeginBlockAsync(types.RequestBeginBlock) *ReqRes - EndBlockAsync(types.RequestEndBlock) *ReqRes - BeginRecheckTxAsync(types.RequestBeginRecheckTx) *ReqRes - EndRecheckTxAsync(types.RequestEndRecheckTx) *ReqRes - ListSnapshotsAsync(types.RequestListSnapshots) *ReqRes - OfferSnapshotAsync(types.RequestOfferSnapshot) *ReqRes - LoadSnapshotChunkAsync(types.RequestLoadSnapshotChunk) *ReqRes - ApplySnapshotChunkAsync(types.RequestApplySnapshotChunk) *ReqRes - - FlushSync() error - EchoSync(msg string) (*types.ResponseEcho, error) + FlushAsync(ResponseCallback) *ReqRes + EchoAsync(string, ResponseCallback) *ReqRes + InfoAsync(types.RequestInfo, ResponseCallback) *ReqRes + SetOptionAsync(types.RequestSetOption, ResponseCallback) *ReqRes + DeliverTxAsync(types.RequestDeliverTx, ResponseCallback) *ReqRes + CheckTxAsync(types.RequestCheckTx, ResponseCallback) *ReqRes + QueryAsync(types.RequestQuery, ResponseCallback) *ReqRes + CommitAsync(ResponseCallback) *ReqRes + InitChainAsync(types.RequestInitChain, ResponseCallback) *ReqRes + BeginBlockAsync(types.RequestBeginBlock, ResponseCallback) *ReqRes + EndBlockAsync(types.RequestEndBlock, ResponseCallback) *ReqRes + BeginRecheckTxAsync(types.RequestBeginRecheckTx, ResponseCallback) *ReqRes + EndRecheckTxAsync(types.RequestEndRecheckTx, ResponseCallback) *ReqRes + ListSnapshotsAsync(types.RequestListSnapshots, ResponseCallback) *ReqRes + OfferSnapshotAsync(types.RequestOfferSnapshot, ResponseCallback) *ReqRes + LoadSnapshotChunkAsync(types.RequestLoadSnapshotChunk, ResponseCallback) *ReqRes + ApplySnapshotChunkAsync(types.RequestApplySnapshotChunk, ResponseCallback) *ReqRes + + FlushSync() (*types.ResponseFlush, error) + EchoSync(string) (*types.ResponseEcho, error) InfoSync(types.RequestInfo) (*types.ResponseInfo, error) SetOptionSync(types.RequestSetOption) (*types.ResponseSetOption, error) DeliverTxSync(types.RequestDeliverTx) (*types.ResponseDeliverTx, error) @@ -79,73 +80,62 @@ func NewClient(addr, transport string, mustConnect bool) (client Client, err err return } -type Callback func(*types.Request, *types.Response) +type GlobalCallback func(*types.Request, *types.Response) +type ResponseCallback func(*types.Response) type ReqRes struct { *types.Request - *sync.WaitGroup *types.Response // Not set atomically, so be sure to use WaitGroup. mtx tmsync.Mutex - done bool // Gets set to true once *after* WaitGroup.Done(). - cb func(*types.Response) // A single callback that may be set. + wg *sync.WaitGroup + done bool // Gets set to true once *after* WaitGroup.Done(). + cb ResponseCallback // A single callback that may be set. } -func NewReqRes(req *types.Request) *ReqRes { +func NewReqRes(req *types.Request, cb ResponseCallback) *ReqRes { return &ReqRes{ - Request: req, - WaitGroup: waitGroup1(), - Response: nil, + Request: req, + Response: nil, + wg: waitGroup1(), done: false, - cb: nil, + cb: cb, } } -// Sets sets the callback. If reqRes is already done, it will call the cb -// immediately. Note, reqRes.cb should not change if reqRes.done and only one -// callback is supported. -func (r *ReqRes) SetCallback(cb func(res *types.Response)) { - r.mtx.Lock() +// InvokeCallback invokes a thread-safe execution of the configured callback +// if non-nil. +func (reqRes *ReqRes) InvokeCallback() { + reqRes.mtx.Lock() + defer reqRes.mtx.Unlock() - if r.done { - r.mtx.Unlock() - cb(r.Response) - return + if reqRes.cb != nil { + reqRes.cb(reqRes.Response) } - - r.cb = cb - r.mtx.Unlock() } -// InvokeCallback invokes a thread-safe execution of the configured callback -// if non-nil. -func (r *ReqRes) InvokeCallback() { - r.mtx.Lock() - defer r.mtx.Unlock() +func (reqRes *ReqRes) SetDone(res *types.Response) (set bool) { + reqRes.mtx.Lock() + // TODO should we panic if it's already done? + set = !reqRes.done + if set { + reqRes.Response = res + reqRes.done = true + reqRes.wg.Done() + } + reqRes.mtx.Unlock() - if r.cb != nil { - r.cb(r.Response) + // NOTE `reqRes.cb` is immutable so we're safe to access it at here without `mtx` + if set && reqRes.cb != nil { + reqRes.cb(res) } -} -// GetCallback returns the configured callback of the ReqRes object which may be -// nil. Note, it is not safe to concurrently call this in cases where it is -// marked done and SetCallback is called before calling GetCallback as that -// will invoke the callback twice and create a potential race condition. -// -// ref: https://github.com/tendermint/tendermint/issues/5439 -func (r *ReqRes) GetCallback() func(*types.Response) { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.cb + return set } -// SetDone marks the ReqRes object as done. -func (r *ReqRes) SetDone() { - r.mtx.Lock() - r.done = true - r.mtx.Unlock() +func (reqRes *ReqRes) Wait() { + reqRes.wg.Wait() } func waitGroup1() (wg *sync.WaitGroup) { diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 605cb72a3..e8be0f5ec 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -23,27 +23,21 @@ type grpcClient struct { service.BaseService mustConnect bool - client types.ABCIApplicationClient - conn *grpc.ClientConn - chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool + client types.ABCIApplicationClient + conn *grpc.ClientConn - mtx tmsync.Mutex - addr string - err error - resCb func(*types.Request, *types.Response) // listens to all callbacks + mtx tmsync.Mutex + addr string + err error + + globalCbMtx sync.Mutex + globalCb func(*types.Request, *types.Response) // listens to all callbacks } func NewGRPCClient(addr string, mustConnect bool) Client { cli := &grpcClient{ addr: addr, mustConnect: mustConnect, - // Buffering the channel is needed to make calls appear asynchronous, - // which is required when the caller makes multiple async calls before - // processing callbacks (e.g. due to holding locks). 64 means that a - // caller can make up to 64 async calls before a callback must be - // processed (otherwise it deadlocks). It also means that we can make 64 - // gRPC calls while processing a slow callback at the channel head. - chReqRes: make(chan *ReqRes, 64), } cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli) return cli @@ -58,36 +52,6 @@ func (cli *grpcClient) OnStart() error { return err } - // This processes asynchronous request/response messages and dispatches - // them to callbacks. - go func() { - // Use a separate function to use defer for mutex unlocks (this handles panics) - callCb := func(reqres *ReqRes) { - cli.mtx.Lock() - defer cli.mtx.Unlock() - - reqres.SetDone() - reqres.Done() - - // Notify client listener if set - if cli.resCb != nil { - cli.resCb(reqres.Request, reqres.Response) - } - - // Notify reqRes listener if set - if cb := reqres.GetCallback(); cb != nil { - cb(reqres.Response) - } - } - for reqres := range cli.chReqRes { - if reqres != nil { - callCb(reqres) - } else { - cli.Logger.Error("Received nil reqres") - } - } - }() - RETRY_LOOP: for { conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc)) @@ -125,7 +89,6 @@ func (cli *grpcClient) OnStop() { if cli.conn != nil { cli.conn.Close() } - close(cli.chReqRes) } func (cli *grpcClient) StopForError(err error) { @@ -151,12 +114,17 @@ func (cli *grpcClient) Error() error { return cli.err } -// Set listener for all responses -// NOTE: callback may get internally generated flush responses. -func (cli *grpcClient) SetResponseCallback(resCb Callback) { - cli.mtx.Lock() - cli.resCb = resCb - cli.mtx.Unlock() +func (cli *grpcClient) SetGlobalCallback(globalCb GlobalCallback) { + cli.globalCbMtx.Lock() + cli.globalCb = globalCb + cli.globalCbMtx.Unlock() +} + +func (cli *grpcClient) GetGlobalCallback() (cb GlobalCallback) { + cli.globalCbMtx.Lock() + cb = cli.globalCb + cli.globalCbMtx.Unlock() + return cb } //---------------------------------------- @@ -167,281 +135,279 @@ func (cli *grpcClient) SetResponseCallback(resCb Callback) { // maybe one day, if people really want it, we use grpc streams, // but hopefully not :D -func (cli *grpcClient) EchoAsync(msg string) *ReqRes { +func (cli *grpcClient) EchoAsync(msg string, cb ResponseCallback) *ReqRes { req := types.ToRequestEcho(msg) res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Echo{Echo: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Echo{Echo: res}}, cb) } -func (cli *grpcClient) FlushAsync() *ReqRes { +func (cli *grpcClient) FlushAsync(cb ResponseCallback) *ReqRes { req := types.ToRequestFlush() res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Flush{Flush: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Flush{Flush: res}}, cb) } -func (cli *grpcClient) InfoAsync(params types.RequestInfo) *ReqRes { +func (cli *grpcClient) InfoAsync(params types.RequestInfo, cb ResponseCallback) *ReqRes { req := types.ToRequestInfo(params) res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Info{Info: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Info{Info: res}}, cb) } -func (cli *grpcClient) SetOptionAsync(params types.RequestSetOption) *ReqRes { +func (cli *grpcClient) SetOptionAsync(params types.RequestSetOption, cb ResponseCallback) *ReqRes { req := types.ToRequestSetOption(params) res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_SetOption{SetOption: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_SetOption{SetOption: res}}, cb) } -func (cli *grpcClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes { +func (cli *grpcClient) DeliverTxAsync(params types.RequestDeliverTx, cb ResponseCallback) *ReqRes { req := types.ToRequestDeliverTx(params) res, err := cli.client.DeliverTx(context.Background(), req.GetDeliverTx(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_DeliverTx{DeliverTx: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_DeliverTx{DeliverTx: res}}, cb) } -func (cli *grpcClient) CheckTxAsync(params types.RequestCheckTx) *ReqRes { +func (cli *grpcClient) CheckTxAsync(params types.RequestCheckTx, cb ResponseCallback) *ReqRes { req := types.ToRequestCheckTx(params) res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}, cb) } -func (cli *grpcClient) QueryAsync(params types.RequestQuery) *ReqRes { +func (cli *grpcClient) QueryAsync(params types.RequestQuery, cb ResponseCallback) *ReqRes { req := types.ToRequestQuery(params) res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Query{Query: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Query{Query: res}}, cb) } -func (cli *grpcClient) CommitAsync() *ReqRes { +func (cli *grpcClient) CommitAsync(cb ResponseCallback) *ReqRes { req := types.ToRequestCommit() res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Commit{Commit: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Commit{Commit: res}}, cb) } -func (cli *grpcClient) InitChainAsync(params types.RequestInitChain) *ReqRes { +func (cli *grpcClient) InitChainAsync(params types.RequestInitChain, cb ResponseCallback) *ReqRes { req := types.ToRequestInitChain(params) res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_InitChain{InitChain: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_InitChain{InitChain: res}}, cb) } -func (cli *grpcClient) BeginBlockAsync(params types.RequestBeginBlock) *ReqRes { +func (cli *grpcClient) BeginBlockAsync(params types.RequestBeginBlock, cb ResponseCallback) *ReqRes { req := types.ToRequestBeginBlock(params) res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginBlock{BeginBlock: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginBlock{BeginBlock: res}}, cb) } -func (cli *grpcClient) EndBlockAsync(params types.RequestEndBlock) *ReqRes { +func (cli *grpcClient) EndBlockAsync(params types.RequestEndBlock, cb ResponseCallback) *ReqRes { req := types.ToRequestEndBlock(params) res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndBlock{EndBlock: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndBlock{EndBlock: res}}, cb) } -func (cli *grpcClient) BeginRecheckTxAsync(params types.RequestBeginRecheckTx) *ReqRes { +func (cli *grpcClient) BeginRecheckTxAsync(params types.RequestBeginRecheckTx, cb ResponseCallback) *ReqRes { req := types.ToRequestBeginRecheckTx(params) res, err := cli.client.BeginRecheckTx(context.Background(), req.GetBeginRecheckTx(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginRecheckTx{BeginRecheckTx: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginRecheckTx{BeginRecheckTx: res}}, cb) } -func (cli *grpcClient) EndRecheckTxAsync(params types.RequestEndRecheckTx) *ReqRes { +func (cli *grpcClient) EndRecheckTxAsync(params types.RequestEndRecheckTx, cb ResponseCallback) *ReqRes { req := types.ToRequestEndRecheckTx(params) res, err := cli.client.EndRecheckTx(context.Background(), req.GetEndRecheckTx(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndRecheckTx{EndRecheckTx: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndRecheckTx{EndRecheckTx: res}}, cb) } -func (cli *grpcClient) ListSnapshotsAsync(params types.RequestListSnapshots) *ReqRes { +func (cli *grpcClient) ListSnapshotsAsync(params types.RequestListSnapshots, cb ResponseCallback) *ReqRes { req := types.ToRequestListSnapshots(params) res, err := cli.client.ListSnapshots(context.Background(), req.GetListSnapshots(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_ListSnapshots{ListSnapshots: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_ListSnapshots{ListSnapshots: res}}, cb) } -func (cli *grpcClient) OfferSnapshotAsync(params types.RequestOfferSnapshot) *ReqRes { +func (cli *grpcClient) OfferSnapshotAsync(params types.RequestOfferSnapshot, cb ResponseCallback) *ReqRes { req := types.ToRequestOfferSnapshot(params) res, err := cli.client.OfferSnapshot(context.Background(), req.GetOfferSnapshot(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_OfferSnapshot{OfferSnapshot: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_OfferSnapshot{OfferSnapshot: res}}, cb) } -func (cli *grpcClient) LoadSnapshotChunkAsync(params types.RequestLoadSnapshotChunk) *ReqRes { +func (cli *grpcClient) LoadSnapshotChunkAsync(params types.RequestLoadSnapshotChunk, cb ResponseCallback) *ReqRes { req := types.ToRequestLoadSnapshotChunk(params) res, err := cli.client.LoadSnapshotChunk(context.Background(), req.GetLoadSnapshotChunk(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_LoadSnapshotChunk{LoadSnapshotChunk: res}}) + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_LoadSnapshotChunk{LoadSnapshotChunk: res}}, cb) } -func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshotChunk) *ReqRes { +func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshotChunk, cb ResponseCallback) *ReqRes { req := types.ToRequestApplySnapshotChunk(params) res, err := cli.client.ApplySnapshotChunk(context.Background(), req.GetApplySnapshotChunk(), grpc.WaitForReady(true)) if err != nil { cli.StopForError(err) } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_ApplySnapshotChunk{ApplySnapshotChunk: res}}) -} - -// finishAsyncCall creates a ReqRes for an async call, and immediately populates it -// with the response. We don't complete it until it's been ordered via the channel. -func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes { - reqres := NewReqRes(req) - reqres.Response = res - cli.chReqRes <- reqres // use channel for async responses, since they must be ordered - return reqres -} - -// finishSyncCall waits for an async call to complete. It is necessary to call all -// sync calls asynchronously as well, to maintain call and response ordering via -// the channel, and this method will wait until the async call completes. -func (cli *grpcClient) finishSyncCall(reqres *ReqRes) *types.Response { - // It's possible that the callback is called twice, since the callback can - // be called immediately on SetCallback() in addition to after it has been - // set. This is because completing the ReqRes happens in a separate critical - // section from the one where the callback is called: there is a race where - // SetCallback() is called between completing the ReqRes and dispatching the - // callback. - // - // We also buffer the channel with 1 response, since SetCallback() will be - // called synchronously if the reqres is already completed, in which case - // it will block on sending to the channel since it hasn't gotten around to - // receiving from it yet. - // - // ReqRes should really handle callback dispatch internally, to guarantee - // that it's only called once and avoid the above race conditions. - var once sync.Once - ch := make(chan *types.Response, 1) - reqres.SetCallback(func(res *types.Response) { - once.Do(func() { - ch <- res - }) - }) - return <-ch + return cli.finishAsyncCall(req, + &types.Response{Value: &types.Response_ApplySnapshotChunk{ApplySnapshotChunk: res}}, cb) } -//---------------------------------------- +func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response, cb ResponseCallback) *ReqRes { + reqRes := NewReqRes(req, cb) -func (cli *grpcClient) FlushSync() error { - return nil + // goroutine for callbacks + go func() { + set := reqRes.SetDone(res) + if set { + // Notify client listener if set + if globalCb := cli.GetGlobalCallback(); globalCb != nil { + globalCb(req, res) + } + } + }() + + return reqRes +} + +//---------------------------------------- +func (cli *grpcClient) FlushSync() (*types.ResponseFlush, error) { + reqres := cli.FlushAsync(nil) + reqres.Wait() + return reqres.Response.GetFlush(), cli.Error() } func (cli *grpcClient) EchoSync(msg string) (*types.ResponseEcho, error) { - reqres := cli.EchoAsync(msg) + reqres := cli.EchoAsync(msg, nil) + reqres.Wait() // StopForError should already have been called if error is set - return cli.finishSyncCall(reqres).GetEcho(), cli.Error() + return reqres.Response.GetEcho(), cli.Error() } func (cli *grpcClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { - reqres := cli.InfoAsync(req) - return cli.finishSyncCall(reqres).GetInfo(), cli.Error() + reqres := cli.InfoAsync(req, nil) + reqres.Wait() + return reqres.Response.GetInfo(), cli.Error() } func (cli *grpcClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) { - reqres := cli.SetOptionAsync(req) + reqres := cli.SetOptionAsync(req, nil) + reqres.Wait() return reqres.Response.GetSetOption(), cli.Error() } func (cli *grpcClient) DeliverTxSync(params types.RequestDeliverTx) (*types.ResponseDeliverTx, error) { - reqres := cli.DeliverTxAsync(params) - return cli.finishSyncCall(reqres).GetDeliverTx(), cli.Error() + reqres := cli.DeliverTxAsync(params, nil) + reqres.Wait() + return reqres.Response.GetDeliverTx(), cli.Error() } func (cli *grpcClient) CheckTxSync(params types.RequestCheckTx) (*types.ResponseCheckTx, error) { - reqres := cli.CheckTxAsync(params) - return cli.finishSyncCall(reqres).GetCheckTx(), cli.Error() + reqres := cli.CheckTxAsync(params, nil) + reqres.Wait() + return reqres.Response.GetCheckTx(), cli.Error() } func (cli *grpcClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) { - reqres := cli.QueryAsync(req) - return cli.finishSyncCall(reqres).GetQuery(), cli.Error() + reqres := cli.QueryAsync(req, nil) + reqres.Wait() + return reqres.Response.GetQuery(), cli.Error() } func (cli *grpcClient) CommitSync() (*types.ResponseCommit, error) { - reqres := cli.CommitAsync() - return cli.finishSyncCall(reqres).GetCommit(), cli.Error() + reqres := cli.CommitAsync(nil) + reqres.Wait() + return reqres.Response.GetCommit(), cli.Error() } func (cli *grpcClient) InitChainSync(params types.RequestInitChain) (*types.ResponseInitChain, error) { - reqres := cli.InitChainAsync(params) - return cli.finishSyncCall(reqres).GetInitChain(), cli.Error() + reqres := cli.InitChainAsync(params, nil) + reqres.Wait() + return reqres.Response.GetInitChain(), cli.Error() } func (cli *grpcClient) BeginBlockSync(params types.RequestBeginBlock) (*types.ResponseBeginBlock, error) { - reqres := cli.BeginBlockAsync(params) - return cli.finishSyncCall(reqres).GetBeginBlock(), cli.Error() + reqres := cli.BeginBlockAsync(params, nil) + reqres.Wait() + return reqres.Response.GetBeginBlock(), cli.Error() } func (cli *grpcClient) EndBlockSync(params types.RequestEndBlock) (*types.ResponseEndBlock, error) { - reqres := cli.EndBlockAsync(params) - return cli.finishSyncCall(reqres).GetEndBlock(), cli.Error() + reqres := cli.EndBlockAsync(params, nil) + reqres.Wait() + return reqres.Response.GetEndBlock(), cli.Error() } func (cli *grpcClient) BeginRecheckTxSync(params types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) { - reqres := cli.BeginRecheckTxAsync(params) + reqres := cli.BeginRecheckTxAsync(params, nil) + reqres.Wait() return reqres.Response.GetBeginRecheckTx(), cli.Error() } func (cli *grpcClient) EndRecheckTxSync(params types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) { - reqres := cli.EndRecheckTxAsync(params) + reqres := cli.EndRecheckTxAsync(params, nil) + reqres.Wait() return reqres.Response.GetEndRecheckTx(), cli.Error() } func (cli *grpcClient) ListSnapshotsSync(params types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { - reqres := cli.ListSnapshotsAsync(params) - return cli.finishSyncCall(reqres).GetListSnapshots(), cli.Error() + reqres := cli.ListSnapshotsAsync(params, nil) + reqres.Wait() + return reqres.Response.GetListSnapshots(), cli.Error() } func (cli *grpcClient) OfferSnapshotSync(params types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { - reqres := cli.OfferSnapshotAsync(params) - return cli.finishSyncCall(reqres).GetOfferSnapshot(), cli.Error() + reqres := cli.OfferSnapshotAsync(params, nil) + reqres.Wait() + return reqres.Response.GetOfferSnapshot(), cli.Error() } func (cli *grpcClient) LoadSnapshotChunkSync( params types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { - reqres := cli.LoadSnapshotChunkAsync(params) - return cli.finishSyncCall(reqres).GetLoadSnapshotChunk(), cli.Error() + reqres := cli.LoadSnapshotChunkAsync(params, nil) + reqres.Wait() + return reqres.Response.GetLoadSnapshotChunk(), cli.Error() } func (cli *grpcClient) ApplySnapshotChunkSync( params types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { - reqres := cli.ApplySnapshotChunkAsync(params) - return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error() + reqres := cli.ApplySnapshotChunkAsync(params, nil) + reqres.Wait() + return reqres.Response.GetApplySnapshotChunk(), cli.Error() } diff --git a/abci/client/local_client.go b/abci/client/local_client.go index 65e3c4e5d..0186384a6 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -15,11 +15,13 @@ var _ Client = (*localClient)(nil) type localClient struct { service.BaseService - // TODO: remove `mtx` to increase concurrency. - // CONTRACT: The application should protect itself from concurrency as an abci server. + // TODO: remove `mtx` to increase concurrency. We could remove it because the app should protect itself. mtx *tmsync.Mutex + // CONTRACT: The application should protect itself from concurrency as an abci server. types.Application - Callback + + globalCbMtx tmsync.Mutex + globalCb GlobalCallback } func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client { @@ -34,10 +36,17 @@ func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client { return cli } -func (app *localClient) SetResponseCallback(cb Callback) { - app.mtx.Lock() - app.Callback = cb - app.mtx.Unlock() +func (app *localClient) SetGlobalCallback(globalCb GlobalCallback) { + app.globalCbMtx.Lock() + app.globalCb = globalCb + app.globalCbMtx.Unlock() +} + +func (app *localClient) GetGlobalCallback() (cb GlobalCallback) { + app.globalCbMtx.Lock() + cb = app.globalCb + app.globalCbMtx.Unlock() + return cb } // TODO: change types.Application to include Error()? @@ -45,196 +54,174 @@ func (app *localClient) Error() error { return nil } -func (app *localClient) FlushAsync() *ReqRes { +func (app *localClient) FlushAsync(cb ResponseCallback) *ReqRes { // Do nothing - return newLocalReqRes(types.ToRequestFlush(), nil) + reqRes := NewReqRes(types.ToRequestFlush(), cb) + return app.done(reqRes, types.ToResponseFlush()) } -func (app *localClient) EchoAsync(msg string) *ReqRes { - app.mtx.Lock() - defer app.mtx.Unlock() +func (app *localClient) EchoAsync(msg string, cb ResponseCallback) *ReqRes { + // NOTE: commented out for performance. delete all after commenting out all `app.mtx` + // app.mtx.Lock() + // defer app.mtx.Unlock() - return app.callback( - types.ToRequestEcho(msg), - types.ToResponseEcho(msg), - ) + reqRes := NewReqRes(types.ToRequestEcho(msg), cb) + return app.done(reqRes, types.ToResponseEcho(msg)) } -func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes { +func (app *localClient) InfoAsync(req types.RequestInfo, cb ResponseCallback) *ReqRes { app.mtx.Lock() defer app.mtx.Unlock() + reqRes := NewReqRes(types.ToRequestInfo(req), cb) res := app.Application.Info(req) - return app.callback( - types.ToRequestInfo(req), - types.ToResponseInfo(res), - ) + return app.done(reqRes, types.ToResponseInfo(res)) } -func (app *localClient) SetOptionAsync(req types.RequestSetOption) *ReqRes { +func (app *localClient) SetOptionAsync(req types.RequestSetOption, cb ResponseCallback) *ReqRes { app.mtx.Lock() defer app.mtx.Unlock() + reqRes := NewReqRes(types.ToRequestSetOption(req), cb) res := app.Application.SetOption(req) - return app.callback( - types.ToRequestSetOption(req), - types.ToResponseSetOption(res), - ) + return app.done(reqRes, types.ToResponseSetOption(res)) } -func (app *localClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes { +func (app *localClient) DeliverTxAsync(req types.RequestDeliverTx, cb ResponseCallback) *ReqRes { app.mtx.Lock() defer app.mtx.Unlock() - res := app.Application.DeliverTx(params) - return app.callback( - types.ToRequestDeliverTx(params), - types.ToResponseDeliverTx(res), - ) + reqRes := NewReqRes(types.ToRequestDeliverTx(req), cb) + res := app.Application.DeliverTx(req) + return app.done(reqRes, types.ToResponseDeliverTx(res)) } -func (app *localClient) CheckTxAsync(params types.RequestCheckTx) *ReqRes { - req := types.ToRequestCheckTx(params) - reqRes := NewReqRes(req) +func (app *localClient) CheckTxAsync(req types.RequestCheckTx, cb ResponseCallback) *ReqRes { + // NOTE: commented out for performance. delete all after commenting out all `app.mtx` + // app.mtx.Lock() + // defer app.mtx.Unlock() + + reqRes := NewReqRes(types.ToRequestCheckTx(req), cb) - app.Application.CheckTxAsync(params, func(r types.ResponseCheckTx) { + app.Application.CheckTxAsync(req, func(r types.ResponseCheckTx) { res := types.ToResponseCheckTx(r) - app.Callback(req, res) - reqRes.Response = res - reqRes.Done() - reqRes.SetDone() - - // Notify reqRes listener if set - if cb := reqRes.GetCallback(); cb != nil { - cb(res) - } + app.done(reqRes, res) }) return reqRes } -func (app *localClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx) *ReqRes { - res := app.Application.BeginRecheckTx(req) - return app.callback( - types.ToRequestBeginRecheckTx(req), - types.ToResponseBeginRecheckTx(res), - ) -} - -func (app *localClient) EndRecheckTxAsync(req types.RequestEndRecheckTx) *ReqRes { - res := app.Application.EndRecheckTx(req) - return app.callback( - types.ToRequestEndRecheckTx(req), - types.ToResponseEndRecheckTx(res), - ) -} - -func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes { +func (app *localClient) QueryAsync(req types.RequestQuery, cb ResponseCallback) *ReqRes { app.mtx.Lock() defer app.mtx.Unlock() + reqRes := NewReqRes(types.ToRequestQuery(req), cb) res := app.Application.Query(req) - return app.callback( - types.ToRequestQuery(req), - types.ToResponseQuery(res), - ) + return app.done(reqRes, types.ToResponseQuery(res)) } -func (app *localClient) CommitAsync() *ReqRes { +func (app *localClient) CommitAsync(cb ResponseCallback) *ReqRes { app.mtx.Lock() defer app.mtx.Unlock() + reqRes := NewReqRes(types.ToRequestCommit(), cb) res := app.Application.Commit() - return app.callback( - types.ToRequestCommit(), - types.ToResponseCommit(res), - ) + return app.done(reqRes, types.ToResponseCommit(res)) } -func (app *localClient) InitChainAsync(req types.RequestInitChain) *ReqRes { +func (app *localClient) InitChainAsync(req types.RequestInitChain, cb ResponseCallback) *ReqRes { app.mtx.Lock() defer app.mtx.Unlock() + reqRes := NewReqRes(types.ToRequestInitChain(req), cb) res := app.Application.InitChain(req) - return app.callback( - types.ToRequestInitChain(req), - types.ToResponseInitChain(res), - ) + return app.done(reqRes, types.ToResponseInitChain(res)) } -func (app *localClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes { +func (app *localClient) BeginBlockAsync(req types.RequestBeginBlock, cb ResponseCallback) *ReqRes { app.mtx.Lock() defer app.mtx.Unlock() + reqRes := NewReqRes(types.ToRequestBeginBlock(req), cb) res := app.Application.BeginBlock(req) - return app.callback( - types.ToRequestBeginBlock(req), - types.ToResponseBeginBlock(res), - ) + return app.done(reqRes, types.ToResponseBeginBlock(res)) } -func (app *localClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes { +func (app *localClient) EndBlockAsync(req types.RequestEndBlock, cb ResponseCallback) *ReqRes { app.mtx.Lock() defer app.mtx.Unlock() + reqRes := NewReqRes(types.ToRequestEndBlock(req), cb) res := app.Application.EndBlock(req) - return app.callback( - types.ToRequestEndBlock(req), - types.ToResponseEndBlock(res), - ) + return app.done(reqRes, types.ToResponseEndBlock(res)) } -func (app *localClient) ListSnapshotsAsync(req types.RequestListSnapshots) *ReqRes { +func (app *localClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx, cb ResponseCallback) *ReqRes { + // NOTE: commented out for performance. delete all after commenting out all `app.mtx` + // app.mtx.Lock() + // defer app.mtx.Unlock() + + reqRes := NewReqRes(types.ToRequestBeginRecheckTx(req), cb) + res := app.Application.BeginRecheckTx(req) + return app.done(reqRes, types.ToResponseBeginRecheckTx(res)) +} + +func (app *localClient) EndRecheckTxAsync(req types.RequestEndRecheckTx, cb ResponseCallback) *ReqRes { + // NOTE: commented out for performance. delete all after commenting out all `app.mtx` + // app.mtx.Lock() + // defer app.mtx.Unlock() + + reqRes := NewReqRes(types.ToRequestEndRecheckTx(req), cb) + res := app.Application.EndRecheckTx(req) + return app.done(reqRes, types.ToResponseEndRecheckTx(res)) +} + +func (app *localClient) ListSnapshotsAsync(req types.RequestListSnapshots, cb ResponseCallback) *ReqRes { app.mtx.Lock() defer app.mtx.Unlock() + reqRes := NewReqRes(types.ToRequestListSnapshots(req), cb) res := app.Application.ListSnapshots(req) - return app.callback( - types.ToRequestListSnapshots(req), - types.ToResponseListSnapshots(res), - ) + return app.done(reqRes, types.ToResponseListSnapshots(res)) } -func (app *localClient) OfferSnapshotAsync(req types.RequestOfferSnapshot) *ReqRes { +func (app *localClient) OfferSnapshotAsync(req types.RequestOfferSnapshot, cb ResponseCallback) *ReqRes { app.mtx.Lock() defer app.mtx.Unlock() + reqRes := NewReqRes(types.ToRequestOfferSnapshot(req), cb) res := app.Application.OfferSnapshot(req) - return app.callback( - types.ToRequestOfferSnapshot(req), - types.ToResponseOfferSnapshot(res), - ) + return app.done(reqRes, types.ToResponseOfferSnapshot(res)) } -func (app *localClient) LoadSnapshotChunkAsync(req types.RequestLoadSnapshotChunk) *ReqRes { +func (app *localClient) LoadSnapshotChunkAsync(req types.RequestLoadSnapshotChunk, cb ResponseCallback) *ReqRes { app.mtx.Lock() defer app.mtx.Unlock() + reqRes := NewReqRes(types.ToRequestLoadSnapshotChunk(req), cb) res := app.Application.LoadSnapshotChunk(req) - return app.callback( - types.ToRequestLoadSnapshotChunk(req), - types.ToResponseLoadSnapshotChunk(res), - ) + return app.done(reqRes, types.ToResponseLoadSnapshotChunk(res)) } -func (app *localClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotChunk) *ReqRes { +func (app *localClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotChunk, cb ResponseCallback) *ReqRes { app.mtx.Lock() defer app.mtx.Unlock() + reqRes := NewReqRes(types.ToRequestApplySnapshotChunk(req), cb) res := app.Application.ApplySnapshotChunk(req) - return app.callback( - types.ToRequestApplySnapshotChunk(req), - types.ToResponseApplySnapshotChunk(res), - ) + return app.done(reqRes, types.ToResponseApplySnapshotChunk(res)) } //------------------------------------------------------- - -func (app *localClient) FlushSync() error { - return nil +func (app *localClient) FlushSync() (*types.ResponseFlush, error) { + return &types.ResponseFlush{}, nil } func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) { + // NOTE: commented out for performance. delete all after commenting out all `app.mtx` + // app.mtx.Lock() + // defer app.mtx.Unlock() + return &types.ResponseEcho{Message: msg}, nil } @@ -263,6 +250,10 @@ func (app *localClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respon } func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) { + // NOTE: commented out for performance. delete all after commenting out all `app.mtx` + // app.mtx.Lock() + // defer app.mtx.Unlock() + res := app.Application.CheckTxSync(req) return &res, nil } @@ -308,16 +299,18 @@ func (app *localClient) EndBlockSync(req types.RequestEndBlock) (*types.Response } func (app *localClient) BeginRecheckTxSync(req types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) { - app.mtx.Lock() - defer app.mtx.Unlock() + // NOTE: commented out for performance. delete all after commenting out all `app.mtx` + // app.mtx.Lock() + // defer app.mtx.Unlock() res := app.Application.BeginRecheckTx(req) return &res, nil } func (app *localClient) EndRecheckTxSync(req types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) { - app.mtx.Lock() - defer app.mtx.Unlock() + // NOTE: commented out for performance. delete all after commenting out all `app.mtx` + // app.mtx.Lock() + // defer app.mtx.Unlock() res := app.Application.EndRecheckTx(req) return &res, nil @@ -359,15 +352,12 @@ func (app *localClient) ApplySnapshotChunkSync( //------------------------------------------------------- -func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes { - app.Callback(req, res) - return newLocalReqRes(req, res) -} - -func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes { - reqRes := NewReqRes(req) - reqRes.Response = res - reqRes.Done() - reqRes.SetDone() +func (app *localClient) done(reqRes *ReqRes, res *types.Response) *ReqRes { + set := reqRes.SetDone(res) + if set { + if globalCb := app.GetGlobalCallback(); globalCb != nil { + globalCb(reqRes.Request, res) + } + } return reqRes } diff --git a/abci/client/mocks/client.go b/abci/client/mocks/client.go index 031e983b4..4ef898247 100644 --- a/abci/client/mocks/client.go +++ b/abci/client/mocks/client.go @@ -16,13 +16,13 @@ type Client struct { mock.Mock } -// ApplySnapshotChunkAsync provides a mock function with given fields: _a0 -func (_m *Client) ApplySnapshotChunkAsync(_a0 types.RequestApplySnapshotChunk) *abcicli.ReqRes { - ret := _m.Called(_a0) +// ApplySnapshotChunkAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) ApplySnapshotChunkAsync(_a0 types.RequestApplySnapshotChunk, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestApplySnapshotChunk) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestApplySnapshotChunk, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -55,13 +55,13 @@ func (_m *Client) ApplySnapshotChunkSync(_a0 types.RequestApplySnapshotChunk) (* return r0, r1 } -// BeginBlockAsync provides a mock function with given fields: _a0 -func (_m *Client) BeginBlockAsync(_a0 types.RequestBeginBlock) *abcicli.ReqRes { - ret := _m.Called(_a0) +// BeginBlockAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) BeginBlockAsync(_a0 types.RequestBeginBlock, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestBeginBlock) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestBeginBlock, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -94,13 +94,13 @@ func (_m *Client) BeginBlockSync(_a0 types.RequestBeginBlock) (*types.ResponseBe return r0, r1 } -// BeginRecheckTxAsync provides a mock function with given fields: _a0 -func (_m *Client) BeginRecheckTxAsync(_a0 types.RequestBeginRecheckTx) *abcicli.ReqRes { - ret := _m.Called(_a0) +// BeginRecheckTxAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) BeginRecheckTxAsync(_a0 types.RequestBeginRecheckTx, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestBeginRecheckTx) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestBeginRecheckTx, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -133,13 +133,13 @@ func (_m *Client) BeginRecheckTxSync(_a0 types.RequestBeginRecheckTx) (*types.Re return r0, r1 } -// CheckTxAsync provides a mock function with given fields: _a0 -func (_m *Client) CheckTxAsync(_a0 types.RequestCheckTx) *abcicli.ReqRes { - ret := _m.Called(_a0) +// CheckTxAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) CheckTxAsync(_a0 types.RequestCheckTx, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestCheckTx) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestCheckTx, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -172,13 +172,13 @@ func (_m *Client) CheckTxSync(_a0 types.RequestCheckTx) (*types.ResponseCheckTx, return r0, r1 } -// CommitAsync provides a mock function with given fields: -func (_m *Client) CommitAsync() *abcicli.ReqRes { - ret := _m.Called() +// CommitAsync provides a mock function with given fields: _a0 +func (_m *Client) CommitAsync(_a0 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func() *abcicli.ReqRes); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -211,13 +211,13 @@ func (_m *Client) CommitSync() (*types.ResponseCommit, error) { return r0, r1 } -// DeliverTxAsync provides a mock function with given fields: _a0 -func (_m *Client) DeliverTxAsync(_a0 types.RequestDeliverTx) *abcicli.ReqRes { - ret := _m.Called(_a0) +// DeliverTxAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) DeliverTxAsync(_a0 types.RequestDeliverTx, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestDeliverTx) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestDeliverTx, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -250,13 +250,13 @@ func (_m *Client) DeliverTxSync(_a0 types.RequestDeliverTx) (*types.ResponseDeli return r0, r1 } -// EchoAsync provides a mock function with given fields: msg -func (_m *Client) EchoAsync(msg string) *abcicli.ReqRes { - ret := _m.Called(msg) +// EchoAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) EchoAsync(_a0 string, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(string) *abcicli.ReqRes); ok { - r0 = rf(msg) + if rf, ok := ret.Get(0).(func(string, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -266,13 +266,13 @@ func (_m *Client) EchoAsync(msg string) *abcicli.ReqRes { return r0 } -// EchoSync provides a mock function with given fields: msg -func (_m *Client) EchoSync(msg string) (*types.ResponseEcho, error) { - ret := _m.Called(msg) +// EchoSync provides a mock function with given fields: _a0 +func (_m *Client) EchoSync(_a0 string) (*types.ResponseEcho, error) { + ret := _m.Called(_a0) var r0 *types.ResponseEcho if rf, ok := ret.Get(0).(func(string) *types.ResponseEcho); ok { - r0 = rf(msg) + r0 = rf(_a0) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*types.ResponseEcho) @@ -281,7 +281,7 @@ func (_m *Client) EchoSync(msg string) (*types.ResponseEcho, error) { var r1 error if rf, ok := ret.Get(1).(func(string) error); ok { - r1 = rf(msg) + r1 = rf(_a0) } else { r1 = ret.Error(1) } @@ -289,13 +289,13 @@ func (_m *Client) EchoSync(msg string) (*types.ResponseEcho, error) { return r0, r1 } -// EndBlockAsync provides a mock function with given fields: _a0 -func (_m *Client) EndBlockAsync(_a0 types.RequestEndBlock) *abcicli.ReqRes { - ret := _m.Called(_a0) +// EndBlockAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) EndBlockAsync(_a0 types.RequestEndBlock, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestEndBlock) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestEndBlock, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -328,13 +328,13 @@ func (_m *Client) EndBlockSync(_a0 types.RequestEndBlock) (*types.ResponseEndBlo return r0, r1 } -// EndRecheckTxAsync provides a mock function with given fields: _a0 -func (_m *Client) EndRecheckTxAsync(_a0 types.RequestEndRecheckTx) *abcicli.ReqRes { - ret := _m.Called(_a0) +// EndRecheckTxAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) EndRecheckTxAsync(_a0 types.RequestEndRecheckTx, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestEndRecheckTx) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestEndRecheckTx, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -381,43 +381,29 @@ func (_m *Client) Error() error { return r0 } -// FlushAsync provides a mock function with given fields: -func (_m *Client) FlushAsync() *abcicli.ReqRes { +// GetGlobalCallback provides a mock function with given fields: +func (_m *Client) GetGlobalCallback() abcicli.GlobalCallback { ret := _m.Called() - var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func() *abcicli.ReqRes); ok { + var r0 abcicli.GlobalCallback + if rf, ok := ret.Get(0).(func() abcicli.GlobalCallback); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*abcicli.ReqRes) + r0 = ret.Get(0).(abcicli.GlobalCallback) } } return r0 } -// FlushSync provides a mock function with given fields: -func (_m *Client) FlushSync() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// InfoAsync provides a mock function with given fields: _a0 -func (_m *Client) InfoAsync(_a0 types.RequestInfo) *abcicli.ReqRes { - ret := _m.Called(_a0) +// InfoAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) InfoAsync(_a0 types.RequestInfo, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestInfo) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestInfo, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -450,13 +436,13 @@ func (_m *Client) InfoSync(_a0 types.RequestInfo) (*types.ResponseInfo, error) { return r0, r1 } -// InitChainAsync provides a mock function with given fields: _a0 -func (_m *Client) InitChainAsync(_a0 types.RequestInitChain) *abcicli.ReqRes { - ret := _m.Called(_a0) +// InitChainAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) InitChainAsync(_a0 types.RequestInitChain, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestInitChain) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestInitChain, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -503,13 +489,13 @@ func (_m *Client) IsRunning() bool { return r0 } -// ListSnapshotsAsync provides a mock function with given fields: _a0 -func (_m *Client) ListSnapshotsAsync(_a0 types.RequestListSnapshots) *abcicli.ReqRes { - ret := _m.Called(_a0) +// ListSnapshotsAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) ListSnapshotsAsync(_a0 types.RequestListSnapshots, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestListSnapshots) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestListSnapshots, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -542,13 +528,13 @@ func (_m *Client) ListSnapshotsSync(_a0 types.RequestListSnapshots) (*types.Resp return r0, r1 } -// LoadSnapshotChunkAsync provides a mock function with given fields: _a0 -func (_m *Client) LoadSnapshotChunkAsync(_a0 types.RequestLoadSnapshotChunk) *abcicli.ReqRes { - ret := _m.Called(_a0) +// LoadSnapshotChunkAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) LoadSnapshotChunkAsync(_a0 types.RequestLoadSnapshotChunk, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestLoadSnapshotChunk) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestLoadSnapshotChunk, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -581,13 +567,13 @@ func (_m *Client) LoadSnapshotChunkSync(_a0 types.RequestLoadSnapshotChunk) (*ty return r0, r1 } -// OfferSnapshotAsync provides a mock function with given fields: _a0 -func (_m *Client) OfferSnapshotAsync(_a0 types.RequestOfferSnapshot) *abcicli.ReqRes { - ret := _m.Called(_a0) +// OfferSnapshotAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) OfferSnapshotAsync(_a0 types.RequestOfferSnapshot, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestOfferSnapshot) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestOfferSnapshot, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -653,13 +639,13 @@ func (_m *Client) OnStop() { _m.Called() } -// QueryAsync provides a mock function with given fields: _a0 -func (_m *Client) QueryAsync(_a0 types.RequestQuery) *abcicli.ReqRes { - ret := _m.Called(_a0) +// QueryAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) QueryAsync(_a0 types.RequestQuery, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestQuery) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestQuery, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -722,18 +708,23 @@ func (_m *Client) Reset() error { return r0 } +// SetGlobalCallback provides a mock function with given fields: _a0 +func (_m *Client) SetGlobalCallback(_a0 abcicli.GlobalCallback) { + _m.Called(_a0) +} + // SetLogger provides a mock function with given fields: _a0 func (_m *Client) SetLogger(_a0 log.Logger) { _m.Called(_a0) } -// SetOptionAsync provides a mock function with given fields: _a0 -func (_m *Client) SetOptionAsync(_a0 types.RequestSetOption) *abcicli.ReqRes { - ret := _m.Called(_a0) +// SetOptionAsync provides a mock function with given fields: _a0, _a1 +func (_m *Client) SetOptionAsync(_a0 types.RequestSetOption, _a1 abcicli.ResponseCallback) *abcicli.ReqRes { + ret := _m.Called(_a0, _a1) var r0 *abcicli.ReqRes - if rf, ok := ret.Get(0).(func(types.RequestSetOption) *abcicli.ReqRes); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(types.RequestSetOption, abcicli.ResponseCallback) *abcicli.ReqRes); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*abcicli.ReqRes) @@ -766,11 +757,6 @@ func (_m *Client) SetOptionSync(_a0 types.RequestSetOption) (*types.ResponseSetO return r0, r1 } -// SetResponseCallback provides a mock function with given fields: _a0 -func (_m *Client) SetResponseCallback(_a0 abcicli.Callback) { - _m.Called(_a0) -} - // Start provides a mock function with given fields: func (_m *Client) Start() error { ret := _m.Called() diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index d70441993..377ba4c32 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -36,8 +36,10 @@ type socketClient struct { mtx tmsync.Mutex err error - reqSent *list.List // list of requests sent, waiting for response - resCb func(*types.Request, *types.Response) // called on all requests, if set. + reqSent *list.List // list of requests sent, waiting for response + + globalCbMtx tmsync.Mutex + globalCb GlobalCallback } var _ Client = (*socketClient)(nil) @@ -53,7 +55,7 @@ func NewSocketClient(addr string, mustConnect bool) Client { addr: addr, reqSent: list.New(), - resCb: nil, + globalCb: nil, } cli.BaseService = *service.NewBaseService(nil, "socketClient", cli) return cli @@ -104,14 +106,17 @@ func (cli *socketClient) Error() error { return cli.err } -// SetResponseCallback sets a callback, which will be executed for each -// non-error & non-empty response from the server. -// -// NOTE: callback may get internally generated flush responses. -func (cli *socketClient) SetResponseCallback(resCb Callback) { - cli.mtx.Lock() - cli.resCb = resCb - cli.mtx.Unlock() +func (app *socketClient) SetGlobalCallback(globalCb GlobalCallback) { + app.globalCbMtx.Lock() + app.globalCb = globalCb + app.globalCbMtx.Unlock() +} + +func (app *socketClient) GetGlobalCallback() (cb GlobalCallback) { + app.globalCbMtx.Lock() + cb = app.globalCb + app.globalCbMtx.Unlock() + return cb } //---------------------------------------- @@ -140,7 +145,7 @@ func (cli *socketClient) sendRequestsRoutine(conn io.Writer) { } case <-cli.flushTimer.Ch: // flush queue select { - case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): + case cli.reqQueue <- NewReqRes(types.ToRequestFlush(), nil): default: // Probably will fill the buffer, or retry later. } @@ -200,12 +205,12 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error { } reqres.Response = res - reqres.Done() // release waiters + reqres.wg.Done() // release waiters cli.reqSent.Remove(next) // pop first item from linked list // Notify client listener if set (global callback). - if cli.resCb != nil { - cli.resCb(reqres.Request, res) + if cli.globalCb != nil { + cli.globalCb(reqres.Request, res) } // Notify reqRes listener if set (request specific callback). @@ -219,88 +224,88 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error { //---------------------------------------- -func (cli *socketClient) EchoAsync(msg string) *ReqRes { - return cli.queueRequest(types.ToRequestEcho(msg)) +func (cli *socketClient) EchoAsync(msg string, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestEcho(msg), cb) } -func (cli *socketClient) FlushAsync() *ReqRes { - return cli.queueRequest(types.ToRequestFlush()) +func (cli *socketClient) FlushAsync(cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestFlush(), cb) } -func (cli *socketClient) InfoAsync(req types.RequestInfo) *ReqRes { - return cli.queueRequest(types.ToRequestInfo(req)) +func (cli *socketClient) InfoAsync(req types.RequestInfo, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestInfo(req), cb) } -func (cli *socketClient) SetOptionAsync(req types.RequestSetOption) *ReqRes { - return cli.queueRequest(types.ToRequestSetOption(req)) +func (cli *socketClient) SetOptionAsync(req types.RequestSetOption, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestSetOption(req), cb) } -func (cli *socketClient) DeliverTxAsync(req types.RequestDeliverTx) *ReqRes { - return cli.queueRequest(types.ToRequestDeliverTx(req)) +func (cli *socketClient) DeliverTxAsync(req types.RequestDeliverTx, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestDeliverTx(req), cb) } -func (cli *socketClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes { - return cli.queueRequest(types.ToRequestCheckTx(req)) +func (cli *socketClient) CheckTxAsync(req types.RequestCheckTx, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestCheckTx(req), cb) } -func (cli *socketClient) QueryAsync(req types.RequestQuery) *ReqRes { - return cli.queueRequest(types.ToRequestQuery(req)) +func (cli *socketClient) QueryAsync(req types.RequestQuery, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestQuery(req), cb) } -func (cli *socketClient) CommitAsync() *ReqRes { - return cli.queueRequest(types.ToRequestCommit()) +func (cli *socketClient) CommitAsync(cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestCommit(), cb) } -func (cli *socketClient) InitChainAsync(req types.RequestInitChain) *ReqRes { - return cli.queueRequest(types.ToRequestInitChain(req)) +func (cli *socketClient) InitChainAsync(req types.RequestInitChain, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestInitChain(req), cb) } -func (cli *socketClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes { - return cli.queueRequest(types.ToRequestBeginBlock(req)) +func (cli *socketClient) BeginBlockAsync(req types.RequestBeginBlock, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestBeginBlock(req), cb) } -func (cli *socketClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes { - return cli.queueRequest(types.ToRequestEndBlock(req)) +func (cli *socketClient) EndBlockAsync(req types.RequestEndBlock, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestEndBlock(req), cb) } -func (cli *socketClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx) *ReqRes { - return cli.queueRequest(types.ToRequestBeginRecheckTx(req)) +func (cli *socketClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestBeginRecheckTx(req), cb) } -func (cli *socketClient) EndRecheckTxAsync(req types.RequestEndRecheckTx) *ReqRes { - return cli.queueRequest(types.ToRequestEndRecheckTx(req)) +func (cli *socketClient) EndRecheckTxAsync(req types.RequestEndRecheckTx, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestEndRecheckTx(req), cb) } -func (cli *socketClient) ListSnapshotsAsync(req types.RequestListSnapshots) *ReqRes { - return cli.queueRequest(types.ToRequestListSnapshots(req)) +func (cli *socketClient) ListSnapshotsAsync(req types.RequestListSnapshots, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestListSnapshots(req), cb) } -func (cli *socketClient) OfferSnapshotAsync(req types.RequestOfferSnapshot) *ReqRes { - return cli.queueRequest(types.ToRequestOfferSnapshot(req)) +func (cli *socketClient) OfferSnapshotAsync(req types.RequestOfferSnapshot, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestOfferSnapshot(req), cb) } -func (cli *socketClient) LoadSnapshotChunkAsync(req types.RequestLoadSnapshotChunk) *ReqRes { - return cli.queueRequest(types.ToRequestLoadSnapshotChunk(req)) +func (cli *socketClient) LoadSnapshotChunkAsync(req types.RequestLoadSnapshotChunk, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestLoadSnapshotChunk(req), cb) } -func (cli *socketClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotChunk) *ReqRes { - return cli.queueRequest(types.ToRequestApplySnapshotChunk(req)) +func (cli *socketClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotChunk, cb ResponseCallback) *ReqRes { + return cli.queueRequest(types.ToRequestApplySnapshotChunk(req), cb) } //---------------------------------------- -func (cli *socketClient) FlushSync() error { - reqRes := cli.queueRequest(types.ToRequestFlush()) +func (cli *socketClient) FlushSync() (*types.ResponseFlush, error) { + reqRes := cli.queueRequest(types.ToRequestFlush(), nil) if err := cli.Error(); err != nil { - return err + return nil, err } reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here - return cli.Error() + return reqRes.Response.GetFlush(), cli.Error() } func (cli *socketClient) EchoSync(msg string) (*types.ResponseEcho, error) { - reqres := cli.queueRequest(types.ToRequestEcho(msg)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestEcho(msg), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -308,8 +313,8 @@ func (cli *socketClient) EchoSync(msg string) (*types.ResponseEcho, error) { } func (cli *socketClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { - reqres := cli.queueRequest(types.ToRequestInfo(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestInfo(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -317,8 +322,8 @@ func (cli *socketClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, e } func (cli *socketClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) { - reqres := cli.queueRequest(types.ToRequestSetOption(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestSetOption(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -326,8 +331,8 @@ func (cli *socketClient) SetOptionSync(req types.RequestSetOption) (*types.Respo } func (cli *socketClient) DeliverTxSync(req types.RequestDeliverTx) (*types.ResponseDeliverTx, error) { - reqres := cli.queueRequest(types.ToRequestDeliverTx(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestDeliverTx(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -335,8 +340,8 @@ func (cli *socketClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respo } func (cli *socketClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) { - reqres := cli.queueRequest(types.ToRequestCheckTx(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestCheckTx(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -344,8 +349,8 @@ func (cli *socketClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseC } func (cli *socketClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) { - reqres := cli.queueRequest(types.ToRequestQuery(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestQuery(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -353,8 +358,8 @@ func (cli *socketClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery } func (cli *socketClient) CommitSync() (*types.ResponseCommit, error) { - reqres := cli.queueRequest(types.ToRequestCommit()) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestCommit(), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -362,8 +367,8 @@ func (cli *socketClient) CommitSync() (*types.ResponseCommit, error) { } func (cli *socketClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) { - reqres := cli.queueRequest(types.ToRequestInitChain(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestInitChain(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -371,8 +376,8 @@ func (cli *socketClient) InitChainSync(req types.RequestInitChain) (*types.Respo } func (cli *socketClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) { - reqres := cli.queueRequest(types.ToRequestBeginBlock(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestBeginBlock(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -380,8 +385,8 @@ func (cli *socketClient) BeginBlockSync(req types.RequestBeginBlock) (*types.Res } func (cli *socketClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) { - reqres := cli.queueRequest(types.ToRequestEndBlock(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestEndBlock(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -389,8 +394,8 @@ func (cli *socketClient) EndBlockSync(req types.RequestEndBlock) (*types.Respons } func (cli *socketClient) BeginRecheckTxSync(req types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) { - reqres := cli.queueRequest(types.ToRequestBeginRecheckTx(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestBeginRecheckTx(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -398,8 +403,8 @@ func (cli *socketClient) BeginRecheckTxSync(req types.RequestBeginRecheckTx) (*t } func (cli *socketClient) EndRecheckTxSync(req types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) { - reqres := cli.queueRequest(types.ToRequestEndRecheckTx(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestEndRecheckTx(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -407,8 +412,8 @@ func (cli *socketClient) EndRecheckTxSync(req types.RequestEndRecheckTx) (*types } func (cli *socketClient) ListSnapshotsSync(req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { - reqres := cli.queueRequest(types.ToRequestListSnapshots(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestListSnapshots(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -416,8 +421,8 @@ func (cli *socketClient) ListSnapshotsSync(req types.RequestListSnapshots) (*typ } func (cli *socketClient) OfferSnapshotSync(req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { - reqres := cli.queueRequest(types.ToRequestOfferSnapshot(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestOfferSnapshot(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -426,8 +431,8 @@ func (cli *socketClient) OfferSnapshotSync(req types.RequestOfferSnapshot) (*typ func (cli *socketClient) LoadSnapshotChunkSync( req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { - reqres := cli.queueRequest(types.ToRequestLoadSnapshotChunk(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestLoadSnapshotChunk(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } @@ -436,8 +441,8 @@ func (cli *socketClient) LoadSnapshotChunkSync( func (cli *socketClient) ApplySnapshotChunkSync( req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { - reqres := cli.queueRequest(types.ToRequestApplySnapshotChunk(req)) - if err := cli.FlushSync(); err != nil { + reqres := cli.queueRequest(types.ToRequestApplySnapshotChunk(req), nil) + if _, err := cli.FlushSync(); err != nil { return nil, err } return reqres.Response.GetApplySnapshotChunk(), cli.Error() @@ -445,8 +450,8 @@ func (cli *socketClient) ApplySnapshotChunkSync( //---------------------------------------- -func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { - reqres := NewReqRes(req) +func (cli *socketClient) queueRequest(req *types.Request, cb ResponseCallback) *ReqRes { + reqres := NewReqRes(req, cb) // TODO: set cli.err if reqQueue times out cli.reqQueue <- reqres @@ -469,7 +474,7 @@ func (cli *socketClient) flushQueue() { // mark all in-flight messages as resolved (they will get cli.Error()) for req := cli.reqSent.Front(); req != nil; req = req.Next() { reqres := req.Value.(*ReqRes) - reqres.Done() + reqres.wg.Done() } // mark all queued messages as resolved @@ -477,7 +482,7 @@ LOOP: for { select { case reqres := <-cli.reqQueue: - reqres.Done() + reqres.wg.Done() default: break LOOP } diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go index 478bbb4ae..150fe5bc9 100644 --- a/abci/client/socket_client_test.go +++ b/abci/client/socket_client_test.go @@ -33,8 +33,8 @@ func TestProperSyncCalls(t *testing.T) { resp := make(chan error, 1) go func() { // This is BeginBlockSync unrolled.... - reqres := c.BeginBlockAsync(types.RequestBeginBlock{}) - err := c.FlushSync() + reqres := c.BeginBlockAsync(types.RequestBeginBlock{}, nil) + _, err := c.FlushSync() require.NoError(t, err) res := reqres.Response.GetBeginBlock() require.NotNil(t, res) @@ -68,8 +68,8 @@ func TestHangingSyncCalls(t *testing.T) { resp := make(chan error, 1) go func() { // Start BeginBlock and flush it - reqres := c.BeginBlockAsync(types.RequestBeginBlock{}) - flush := c.FlushAsync() + reqres := c.BeginBlockAsync(types.RequestBeginBlock{}, nil) + flush := c.FlushAsync(nil) // wait 20 ms for all events to travel socket, but // no response yet from server time.Sleep(20 * time.Millisecond) diff --git a/abci/example/example_test.go b/abci/example/example_test.go index 1d2497ca2..3e67afdc8 100644 --- a/abci/example/example_test.go +++ b/abci/example/example_test.go @@ -76,7 +76,7 @@ func testStream(t *testing.T, app types.Application) { done := make(chan struct{}) counter := 0 - client.SetResponseCallback(func(req *types.Request, res *types.Response) { + client.SetGlobalCallback(func(req *types.Request, res *types.Response) { // Process response switch r := res.Value.(type) { case *types.Response_DeliverTx: @@ -104,19 +104,19 @@ func testStream(t *testing.T, app types.Application) { // Write requests for counter := 0; counter < numDeliverTxs; counter++ { // Send request - reqRes := client.DeliverTxAsync(types.RequestDeliverTx{Tx: []byte("test")}) + reqRes := client.DeliverTxAsync(types.RequestDeliverTx{Tx: []byte("test")}, nil) _ = reqRes // check err ? // Sometimes send flush messages if counter%123 == 0 { - client.FlushAsync() + client.FlushAsync(nil) // check err ? } } // Send final flush message - client.FlushAsync() + client.FlushAsync(nil) <-done } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 5125facd9..91dca4959 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -647,10 +647,10 @@ func TestMockProxyApp(t *testing.T) { txIndex++ } } - mock.SetResponseCallback(proxyCb) + mock.SetGlobalCallback(proxyCb) someTx := []byte("tx") - mock.DeliverTxAsync(abci.RequestDeliverTx{Tx: someTx}) + mock.DeliverTxAsync(abci.RequestDeliverTx{Tx: someTx}, nil) }) assert.True(t, validTxs == 1) assert.True(t, invalidTxs == 0) diff --git a/crypto/example_test.go b/crypto/example_test.go index d90c66339..fb7bc199a 100644 --- a/crypto/example_test.go +++ b/crypto/example_test.go @@ -24,5 +24,5 @@ func ExampleSha256() { sum := crypto.Sha256([]byte("This is Ostracon")) fmt.Printf("%x\n", sum) // Output: - // f91afb642f3d1c87c17eb01aae5cb65c242dfdbe7cf1066cc260f4ce5d33b94e + // e4f6b8545bd63b92f12936eccde66e48cc26895e8e67297ff97cc31c833a38d4 } diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index d5c9b3e88..2921f347c 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -26,7 +26,7 @@ const TxKeySize = sha256.Size var newline = []byte("\n") -// -------------------------------------------------------------------------------- +//-------------------------------------------------------------------------------- // CListMempool is an ordered in-memory pool for transactions before they are // proposed in a consensus round. Transaction validity is checked using the @@ -105,7 +105,7 @@ func NewCListMempool( } else { mempool.cache = nopTxCache{} } - proxyAppConn.SetResponseCallback(mempool.globalCb) + proxyAppConn.SetGlobalCallback(mempool.globalCb) for _, option := range options { option(mempool) } @@ -184,13 +184,14 @@ func (mem *CListMempool) TxsBytes() int64 { // Lock() must be help by the caller during execution. func (mem *CListMempool) FlushAppConn() error { - return mem.proxyAppConn.FlushSync() + _, err := mem.proxyAppConn.FlushSync() + return err } // XXX: Unsafe! Calling Flush may leave mempool in inconsistent state. func (mem *CListMempool) Flush() { - mem.updateMtx.RLock() - defer mem.updateMtx.RUnlock() + mem.updateMtx.Lock() + defer mem.updateMtx.Unlock() _ = atomic.SwapInt64(&mem.txsBytes, 0) mem.cache.Reset() @@ -284,8 +285,7 @@ func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func } // CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) - reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) - reqRes.SetCallback(func(res *abci.Response) { + mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}, func(res *abci.Response) { mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, func(response *abci.Response) { if checkTxCb != nil { checkTxCb(response) @@ -718,20 +718,21 @@ func (mem *CListMempool) recheckTxs() { wg.Add(1) memTx := e.Value.(*mempoolTx) - reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ + req := abci.RequestCheckTx{ Tx: memTx.tx, Type: abci.CheckTxType_Recheck, - }) - reqRes.SetCallback(func(res *abci.Response) { + } + + mem.proxyAppConn.CheckTxAsync(req, func(res *abci.Response) { wg.Done() }) } - mem.proxyAppConn.FlushAsync() + mem.proxyAppConn.FlushAsync(func(res *abci.Response) { }) wg.Wait() } -// -------------------------------------------------------------------------------- +//-------------------------------------------------------------------------------- // mempoolTx is a transaction that successfully ran type mempoolTx struct { @@ -749,7 +750,7 @@ func (memTx *mempoolTx) Height() int64 { return atomic.LoadInt64(&memTx.height) } -// -------------------------------------------------------------------------------- +//-------------------------------------------------------------------------------- type txCache interface { Reset() @@ -832,7 +833,7 @@ func (nopTxCache) Reset() {} func (nopTxCache) Push(types.Tx) bool { return true } func (nopTxCache) Remove(types.Tx) {} -// -------------------------------------------------------------------------------- +//-------------------------------------------------------------------------------- // TxKey is the fixed length array hash used as the key in maps. func TxKey(tx types.Tx) [TxKeySize]byte { diff --git a/proxy/app_conn.go b/proxy/app_conn.go index e012acb36..9be854cf3 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -12,31 +12,29 @@ import ( // Enforce which abci msgs can be sent on a connection at the type level type AppConnConsensus interface { - SetResponseCallback(abcicli.Callback) + SetGlobalCallback(abcicli.GlobalCallback) Error() error InitChainSync(types.RequestInitChain) (*types.ResponseInitChain, error) BeginBlockSync(types.RequestBeginBlock) (*types.ResponseBeginBlock, error) - DeliverTxAsync(types.RequestDeliverTx) *abcicli.ReqRes + DeliverTxAsync(types.RequestDeliverTx, abcicli.ResponseCallback) *abcicli.ReqRes EndBlockSync(types.RequestEndBlock) (*types.ResponseEndBlock, error) CommitSync() (*types.ResponseCommit, error) } type AppConnMempool interface { - SetResponseCallback(abcicli.Callback) + SetGlobalCallback(abcicli.GlobalCallback) Error() error - CheckTxAsync(types.RequestCheckTx) *abcicli.ReqRes - BeginRecheckTxAsync(types.RequestBeginRecheckTx) *abcicli.ReqRes - EndRecheckTxAsync(types.RequestEndRecheckTx) *abcicli.ReqRes - + CheckTxAsync(types.RequestCheckTx, abcicli.ResponseCallback) *abcicli.ReqRes CheckTxSync(types.RequestCheckTx) (*types.ResponseCheckTx, error) + BeginRecheckTxSync(types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) EndRecheckTxSync(types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) - FlushAsync() *abcicli.ReqRes - FlushSync() error + FlushAsync(abcicli.ResponseCallback) *abcicli.ReqRes + FlushSync() (*types.ResponseFlush, error) } type AppConnQuery interface { @@ -71,8 +69,8 @@ func NewAppConnConsensus(appConn abcicli.Client) AppConnConsensus { } } -func (app *appConnConsensus) SetResponseCallback(cb abcicli.Callback) { - app.appConn.SetResponseCallback(cb) +func (app *appConnConsensus) SetGlobalCallback(globalCb abcicli.GlobalCallback) { + app.appConn.SetGlobalCallback(globalCb) } func (app *appConnConsensus) Error() error { @@ -87,8 +85,8 @@ func (app *appConnConsensus) BeginBlockSync(req types.RequestBeginBlock) (*types return app.appConn.BeginBlockSync(req) } -func (app *appConnConsensus) DeliverTxAsync(req types.RequestDeliverTx) *abcicli.ReqRes { - return app.appConn.DeliverTxAsync(req) +func (app *appConnConsensus) DeliverTxAsync(req types.RequestDeliverTx, cb abcicli.ResponseCallback) *abcicli.ReqRes { + return app.appConn.DeliverTxAsync(req, cb) } func (app *appConnConsensus) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) { @@ -112,32 +110,24 @@ func NewAppConnMempool(appConn abcicli.Client) AppConnMempool { } } -func (app *appConnMempool) SetResponseCallback(cb abcicli.Callback) { - app.appConn.SetResponseCallback(cb) +func (app *appConnMempool) SetGlobalCallback(globalCb abcicli.GlobalCallback) { + app.appConn.SetGlobalCallback(globalCb) } func (app *appConnMempool) Error() error { return app.appConn.Error() } -func (app *appConnMempool) FlushAsync() *abcicli.ReqRes { - return app.appConn.FlushAsync() +func (app *appConnMempool) FlushAsync(cb abcicli.ResponseCallback) *abcicli.ReqRes { + return app.appConn.FlushAsync(cb) } -func (app *appConnMempool) FlushSync() error { +func (app *appConnMempool) FlushSync() (*types.ResponseFlush, error) { return app.appConn.FlushSync() } -func (app *appConnMempool) CheckTxAsync(req types.RequestCheckTx) *abcicli.ReqRes { - return app.appConn.CheckTxAsync(req) -} - -func (app *appConnMempool) BeginRecheckTxAsync(req types.RequestBeginRecheckTx) *abcicli.ReqRes { - return app.appConn.BeginRecheckTxAsync(req) -} - -func (app *appConnMempool) EndRecheckTxAsync(req types.RequestEndRecheckTx) *abcicli.ReqRes { - return app.appConn.EndRecheckTxAsync(req) +func (app *appConnMempool) CheckTxAsync(req types.RequestCheckTx, cb abcicli.ResponseCallback) *abcicli.ReqRes { + return app.appConn.CheckTxAsync(req, cb) } func (app *appConnMempool) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) { diff --git a/proxy/app_conn_test.go b/proxy/app_conn_test.go index a9d69f635..72d9c99f5 100644 --- a/proxy/app_conn_test.go +++ b/proxy/app_conn_test.go @@ -16,8 +16,8 @@ import ( //---------------------------------------- type AppConnTest interface { - EchoAsync(string) *abcicli.ReqRes FlushSync() error + EchoAsync(string, abcicli.ResponseCallback) *abcicli.ReqRes InfoSync(types.RequestInfo) (*types.ResponseInfo, error) } @@ -29,8 +29,8 @@ func NewAppConnTest(appConn abcicli.Client) AppConnTest { return &appConnTest{appConn} } -func (app *appConnTest) EchoAsync(msg string) *abcicli.ReqRes { - return app.appConn.EchoAsync(msg) +func (app *appConnTest) EchoAsync(msg string, cb abcicli.ResponseCallback) *abcicli.ReqRes { + return app.appConn.EchoAsync(msg, cb) } func (app *appConnTest) FlushSync() error { @@ -75,7 +75,7 @@ func TestEcho(t *testing.T) { t.Log("Connected") for i := 0; i < 1000; i++ { - proxy.EchoAsync(fmt.Sprintf("echo-%v", i)) + proxy.EchoAsync(fmt.Sprintf("echo-%v", i), nil) } if err := proxy.FlushSync(); err != nil { t.Error(err) @@ -115,7 +115,7 @@ func BenchmarkEcho(b *testing.B) { b.StartTimer() // Start benchmarking tests for i := 0; i < b.N; i++ { - proxy.EchoAsync(echoString) + proxy.EchoAsync(echoString, nil) } if err := proxy.FlushSync(); err != nil { b.Error(err) diff --git a/proxy/mocks/app_conn_consensus.go b/proxy/mocks/app_conn_consensus.go index 35b4677a7..1d66b858b 100644 --- a/proxy/mocks/app_conn_consensus.go +++ b/proxy/mocks/app_conn_consensus.go @@ -136,7 +136,7 @@ func (_m *AppConnConsensus) InitChainSync(_a0 types.RequestInitChain) (*types.Re return r0, r1 } -// SetResponseCallback provides a mock function with given fields: _a0 -func (_m *AppConnConsensus) SetResponseCallback(_a0 abcicli.Callback) { +// SetGlobalCallback provides a mock function with given fields: _a0 +func (_m *AppConnConsensus) SetGlobalCallback(_a0 abcicli.GlobalCallback) { _m.Called(_a0) } diff --git a/proxy/mocks/app_conn_mempool.go b/proxy/mocks/app_conn_mempool.go index f64942ccd..4fa331390 100644 --- a/proxy/mocks/app_conn_mempool.go +++ b/proxy/mocks/app_conn_mempool.go @@ -175,7 +175,7 @@ func (_m *AppConnMempool) FlushSync() error { return r0 } -// SetResponseCallback provides a mock function with given fields: _a0 -func (_m *AppConnMempool) SetResponseCallback(_a0 abcicli.Callback) { +// SetGlobalCallback provides a mock function with given fields: _a0 +func (_m *AppConnMempool) SetGlobalCallback(_a0 abcicli.GlobalCallback) { _m.Called(_a0) } diff --git a/state/execution.go b/state/execution.go index fdcd16eb2..dd740f2e9 100644 --- a/state/execution.go +++ b/state/execution.go @@ -311,7 +311,7 @@ func execBlockOnProxyApp( txIndex++ } } - proxyAppConn.SetResponseCallback(proxyCb) + proxyAppConn.SetGlobalCallback(proxyCb) commitInfo := getBeginBlockValidatorInfo(block, store, initialHeight, voterParams) @@ -341,7 +341,7 @@ func execBlockOnProxyApp( startTime := time.Now() // run txs of block for _, tx := range block.Txs { - proxyAppConn.DeliverTxAsync(abci.RequestDeliverTx{Tx: tx}) + proxyAppConn.DeliverTxAsync(abci.RequestDeliverTx{Tx: tx}, nil) if err := proxyAppConn.Error(); err != nil { return nil, err }