Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: add async call for dubbo protocol #272

Merged
merged 14 commits into from
Dec 19, 2019
2 changes: 1 addition & 1 deletion common/proxy/proxy_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

type ProxyFactory interface {
GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy
GetProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy
zouyx marked this conversation as resolved.
Show resolved Hide resolved
GetInvoker(url common.URL) protocol.Invoker
}

Expand Down
4 changes: 2 additions & 2 deletions common/proxy/proxy_factory/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ type DefaultProxyFactory struct {
func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory {
return &DefaultProxyFactory{}
}
func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy {
func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy {
//create proxy
attachments := map[string]string{}
attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false")
return proxy.NewProxy(invoker, nil, attachments)
return proxy.NewProxy(invoker, callBack, attachments)
}
func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker {
return &ProxyInvoker{
Expand Down
2 changes: 1 addition & 1 deletion common/proxy/proxy_factory/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
func Test_GetProxy(t *testing.T) {
proxyFactory := NewDefaultProxyFactory()
url := common.NewURLWithOptions()
proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), url)
proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), nil, url)
assert.NotNil(t, proxy)
}

Expand Down
12 changes: 12 additions & 0 deletions common/rpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ type RPCService interface {
Reference() string // rpc service id or reference id
}

//AsyncCallbackService callback interface for async
type AsyncCallbackService interface {
CallBack(response CallbackResponse) // callback
}

//CallbackResponse for different protocol
type CallbackResponse interface {
}

//AsyncCallback async callback method
type AsyncCallback func(response CallbackResponse)

// for lowercase func
// func MethodMapper() map[string][string] {
// return map[string][string]{}
Expand Down
8 changes: 5 additions & 3 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type ReferenceConfig struct {
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
async bool `yaml:"async" json:"async,omitempty" property:"async"`
Async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
Expand Down Expand Up @@ -140,8 +140,10 @@ func (refconfig *ReferenceConfig) Refer() {
}
}

callback := GetCallback(refconfig.id)

//create proxy
refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url)
refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, callback, url)
}

// @v is service provider implemented RPCService
Expand Down Expand Up @@ -169,7 +171,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic))
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
//getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async))
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async))

//application info
urlMap.Set(constant.APPLICATION_KEY, consumerConfig.ApplicationConfig.Name)
Expand Down
8 changes: 8 additions & 0 deletions config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,11 @@ func GetConsumerService(name string) common.RPCService {
func GetProviderService(name string) common.RPCService {
return proServices[name]
}

func GetCallback(name string) func(response common.CallbackResponse) {
service := GetConsumerService(name)
if sv, ok := service.(common.AsyncCallbackService); ok {
return sv.CallBack
}
return nil
}
10 changes: 5 additions & 5 deletions protocol/dubbo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,16 @@ type Options struct {
RequestTimeout time.Duration
}

type CallResponse struct {
//AsyncCallbackResponse async response for dubbo
type AsyncCallbackResponse struct {
common.CallbackResponse
Opts Options
Cause error
Start time.Time // invoke(call) start time == write start time
ReadStart time.Time // read start time, write duration = ReadStart - Start
Reply interface{}
}

type AsyncCallback func(response CallResponse)

type Client struct {
opts Options
conf ClientConfig
Expand Down Expand Up @@ -199,12 +199,12 @@ func (c *Client) Call(request *Request, response *Response) error {
return perrors.WithStack(c.call(ct, request, response, nil))
}

func (c *Client) AsyncCall(request *Request, callback AsyncCallback, response *Response) error {
func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, response *Response) error {

return perrors.WithStack(c.call(CT_TwoWay, request, response, callback))
}

func (c *Client) call(ct CallType, request *Request, response *Response, callback AsyncCallback) error {
func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error {

p := &DubboPackage{}
p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
Expand Down
5 changes: 3 additions & 2 deletions protocol/dubbo/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ func TestClient_AsyncCall(t *testing.T) {
user := &User{}
lock := sync.Mutex{}
lock.Lock()
err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response CallResponse) {
assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User))
err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallbackResponse) {
r := response.(AsyncCallbackResponse)
assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User))
lock.Unlock()
}, NewResponse(user, nil))
assert.NoError(t, err)
Expand Down
7 changes: 4 additions & 3 deletions protocol/dubbo/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

import (
"github.com/apache/dubbo-go-hessian2"
"github.com/apache/dubbo-go/common"
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -109,7 +110,7 @@ type PendingResponse struct {
err error
start time.Time
readStart time.Time
callback AsyncCallback
callback common.AsyncCallback
response *Response
done chan struct{}
}
Expand All @@ -122,8 +123,8 @@ func NewPendingResponse() *PendingResponse {
}
}

func (r PendingResponse) GetCallResponse() CallResponse {
return CallResponse{
func (r PendingResponse) GetCallResponse() common.CallbackResponse {
return AsyncCallbackResponse{
Cause: r.err,
Start: r.start,
ReadStart: r.readStart,
Expand Down
2 changes: 1 addition & 1 deletion protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
}
response := NewResponse(inv.Reply(), nil)
if async {
if callBack, ok := inv.CallBack().(func(response CallResponse)); ok {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
} else {
result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()))
Expand Down
6 changes: 4 additions & 2 deletions protocol/dubbo/dubbo_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol/invocation"
)
Expand Down Expand Up @@ -65,8 +66,9 @@ func TestDubboInvoker_Invoke(t *testing.T) {
// AsyncCall
lock := sync.Mutex{}
lock.Lock()
inv.SetCallBack(func(response CallResponse) {
assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User))
inv.SetCallBack(func(response common.CallbackResponse) {
r := response.(AsyncCallbackResponse)
assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User))
lock.Unlock()
})
res = invoker.Invoke(inv)
Expand Down