diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index b51e4e3d91..1374747ab8 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -21,7 +21,6 @@ import ( "errors" "net" "net/url" - "sync" "sync/atomic" "time" @@ -31,6 +30,16 @@ import ( "github.com/gogo/protobuf/proto" ) +var ( + // ErrRequestTimeOut happens when request not finished in given requestTimeout. + ErrRequestTimeOut = errors.New("request timed out") +) + +type result struct { + *RPCResult + error +} + type RPCResult struct { Response *pb.BaseCommand Cnx Connection @@ -121,14 +130,10 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request return nil, err } - type Res struct { - *RPCResult - error - } - ch := make(chan Res, 10) + ch := make(chan result, 1) cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) { - ch <- Res{&RPCResult{ + ch <- result{&RPCResult{ Cnx: cnx, Response: response, }, err} @@ -139,29 +144,30 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request case res := <-ch: return res.RPCResult, res.error case <-time.After(c.requestTimeout): - return nil, errors.New("request timed out") + return nil, ErrRequestTimeOut } } func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { c.metrics.RPCRequestCount.Inc() - wg := sync.WaitGroup{} - wg.Add(1) - rpcResult := &RPCResult{ - Cnx: cnx, - } + ch := make(chan result, 1) - var rpcErr error cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) { - rpcResult.Response = response - rpcErr = err - wg.Done() + ch <- result{&RPCResult{ + Cnx: cnx, + Response: response, + }, err} + close(ch) }) - wg.Wait() - return rpcResult, rpcErr + select { + case res := <-ch: + return res.RPCResult, res.error + case <-time.After(c.requestTimeout): + return nil, ErrRequestTimeOut + } } func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {