Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rft: network & codec #673

Merged
merged 68 commits into from
Sep 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
1eb4a83
getty refactor
cvictory Apr 24, 2020
fa61e9d
refactor getty
cvictory Apr 26, 2020
0ba15c8
refactor getty
cvictory May 8, 2020
9bc37f3
Merge branch 'master' of github.com:apache/dubbo-go into getty_reactor
cvictory May 8, 2020
c67d34a
fix review issue
cvictory May 8, 2020
acc7c79
fix review issue
cvictory May 8, 2020
d48fd1e
remove unused code, add license
cvictory May 9, 2020
58e7e86
fix review issue: add comment and some optimization
cvictory May 14, 2020
adb55ce
split import sentense, fix some review
cvictory May 18, 2020
1bbe0b4
modify log
cvictory May 18, 2020
dc2b68f
fix run issue
cvictory May 19, 2020
fe3c5b0
fix issue: create gettyClientConn every time
cvictory May 19, 2020
1e07952
fix : url.getPath contains slash, but it don't in dubbo-java
cvictory May 19, 2020
c6abc0d
unit test
cvictory May 20, 2020
15608af
fix review issue
cvictory May 27, 2020
cbac0c9
remove unused code
cvictory May 27, 2020
7f9c7a4
reactor import
cvictory May 28, 2020
7c30fdc
fix review issue
cvictory May 29, 2020
dd36c9c
fix review issue
cvictory May 31, 2020
713fcf9
fix review issue
cvictory Jun 2, 2020
a08d2c7
fix review issue
cvictory Jun 2, 2020
5ba2307
refactor ServiceKey func
cvictory Jun 2, 2020
a10ff4d
Merge pull request #495 from cvictory/getty_reactor
hxmhlt Jun 4, 2020
3d308e4
fix: path is not in param
cvictory Jun 10, 2020
38c9c09
fix: path is not in param
cvictory Jun 10, 2020
fff3011
fix: path is not in param
cvictory Jun 16, 2020
1a42f33
Merge pull request #592 from cvictory/getty_reactor
zouyx Jun 22, 2020
6b93bbc
Fix: resolve conflicts
fangyincheng Jun 27, 2020
0e2de75
Merge remote-tracking branch 'apache/develop' into refact-seri
fangyincheng Jun 27, 2020
9b260cf
Fix: merge stash
fangyincheng Jul 25, 2020
14564b9
Mrg: merge develop
fangyincheng Jul 26, 2020
ff4f2f7
Mod: comment ut for protobuf of dubbo
fangyincheng Aug 5, 2020
9a26d03
resolve conflicts
fangyincheng Aug 5, 2020
ee5e94f
Merge remote-tracking branch 'apache/develop' into refact-seri
fangyincheng Aug 8, 2020
6338a82
Fix: fix
fangyincheng Aug 9, 2020
8f032b7
Mrg: from develop
fangyincheng Aug 15, 2020
5050d26
fix: fix TestDubboProtocol_Export test
Aug 16, 2020
f86865d
fix: testDubboProtocol_Export exporterMap after 'Unexport' url key
Aug 16, 2020
eba6a13
fix: fix TestDubboProtocol_Export test
Aug 16, 2020
2d44241
fix: testDubboProtocol_Export exporterMap after 'Unexport' url key
Aug 16, 2020
d2bdc6f
Mrg:merge develop
fangyincheng Aug 30, 2020
ada3eb6
Merge branch 'refact-seri' of github.com:apache/dubbo-go into refact-…
Aug 31, 2020
566a7e3
Mrg: merge from develop
fangyincheng Sep 5, 2020
3ec9e9f
feat: fix protocol, remoting/getty uint test failure
Sep 5, 2020
d58c37f
Merge branch 'refact-seri' of github.com:apache/dubbo-go into refact-…
Sep 5, 2020
f9f1d7a
feat: format import
Sep 5, 2020
2d8d2f0
feat: rest GetBigPkg
Sep 5, 2020
e21eb40
Merge pull request #736 from georgehao/refact-seri
fangyincheng Sep 5, 2020
e719100
Fix:bug
fangyincheng Sep 5, 2020
05f53c5
fix comments
fangyincheng Sep 5, 2020
1477f22
Merge remote-tracking branch 'apache/develop' into refact-seri
fangyincheng Sep 6, 2020
b33f44d
fix comments
fangyincheng Sep 6, 2020
7d114b0
fixed conflicts with develop
fangyincheng Sep 9, 2020
5a3e744
Merge remote-tracking branch 'apache/develop' into refact-seri
fangyincheng Sep 12, 2020
6e8281f
Fix: fixed some comments
fangyincheng Sep 12, 2020
89db192
fix: resove dev RPCResult struct change
Sep 12, 2020
a35e2b2
fix: add injectTraceCtx to getty
Sep 13, 2020
4ccc705
Merge pull request #754 from georgehao/refact-seri
AlexStocks Sep 13, 2020
48d746f
feat: resolve go.mod conflict
Sep 13, 2020
b5b8c0e
Merge pull request #755 from georgehao/refact-seri
fangyincheng Sep 13, 2020
d366705
feat: format DubboCodec
Sep 14, 2020
8279058
feat: add some comment
Sep 14, 2020
5b06802
Merge pull request #756 from georgehao/refact-seri
zouyx Sep 15, 2020
2f48d10
Merge remote-tracking branch 'apache/develop' into refact-seri
fangyincheng Sep 15, 2020
517f54d
feat: delete dubbo_protocol assert remoting.ExchangeClient
Sep 16, 2020
a24034c
feat: merge one line
Sep 17, 2020
0f12ccb
Merge pull request #760 from georgehao/refact-seri
AlexStocks Sep 17, 2020
4f3d4ff
Mrg: fixed conflicts
fangyincheng Sep 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor getty
  • Loading branch information
cvictory committed Apr 26, 2020
commit fa61e9de457577e8e1c5300b1caf39d03ae69789
159 changes: 52 additions & 107 deletions protocol/dubbo/dubbo_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,77 +15,6 @@ import (
"time"
)

////SerialID serial ID
//type SerialID byte
//type SequenceType int64
//
//const (
// // S_Dubbo dubbo serial id
// S_Dubbo SerialID = 2
//)
//
//// DubboPackage ...
//type DubboPackage struct {
// Header hessian.DubboHeader
// Service hessian.Service
// Body interface{}
// Err error
//}
//
//func (p DubboPackage) String() string {
// return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
//}

//
//// Marshal ...
//func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
// codec := hessian.NewHessianCodec(nil)
//
// pkg, err := codec.Write(p.Service, p.Header, p.Body)
// if err != nil {
// return nil, perrors.WithStack(err)
// }
//
// return bytes.NewBuffer(pkg), nil
//}
//
// Unmarshal ...
//func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) error {
// // fix issue https://github.com/apache/dubbo-go/issues/380
// bufLen := buf.Len()
// if bufLen < hessian.HEADER_LENGTH {
// return perrors.WithStack(hessian.ErrHeaderNotEnough)
// }
//
// codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
//
// // read header
// err := codec.ReadHeader(&p.Header)
// if err != nil {
// return perrors.WithStack(err)
// }
//
// if resp != nil { // for client
// if p.Header.Type&hessian.PackageRequest != 0x00 {
// // size of this array must be '7'
// // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
// p.Body = make([]interface{}, 7)
// } else {
// //pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
// //if !ok {
// // return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
// //}
// p.Body = &hessian.Response{RspObj: resp.Reply}
// }
// }
//
// // read body
// err = codec.ReadBody(p.Body)
// return perrors.WithStack(err)
//}

/////////////////////////////////////////
/////////////////////////////////////////
//SerialID serial ID
type SerialID byte

Expand All @@ -94,24 +23,10 @@ const (
S_Dubbo SerialID = 2
)

//CallType call type
type CallType int32

const (
// CT_UNKNOWN unknown call type
CT_UNKNOWN CallType = 0
// CT_OneWay call one way
CT_OneWay CallType = 1
// CT_TwoWay call in request/response
CT_TwoWay CallType = 2
)

////////////////////////////////////////////
// dubbo package
////////////////////////////////////////////

// SequenceType ...
type SequenceType int64
func init() {
codec := &DubboCodec{}
remoting.NewCodec("dubbo", codec)
}

// DubboPackage ...
type DubboPackage struct {
Expand All @@ -138,7 +53,7 @@ func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
}

// Unmarshal ...
func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) error {
// fix issue https://github.com/apache/dubbo-go/issues/380
bufLen := buf.Len()
if bufLen < hessian.HEADER_LENGTH {
Expand All @@ -153,34 +68,39 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
return perrors.WithStack(err)
}

if len(opts) != 0 { // for client
client, ok := opts[0].(*Client)
if !ok {
return perrors.Errorf("opts[0] is not of type *Client")
}

if resp != nil { // for client
if p.Header.Type&hessian.PackageRequest != 0x00 {
// size of this array must be '7'
// https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
p.Body = make([]interface{}, 7)
} else {
pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
if !ok {
pendingRsp := remoting.GetPendingResponse(remoting.SequenceType(p.Header.ID))
if pendingRsp == nil {
return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
}
p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply}
p.Body = &hessian.Response{RspObj: pendingRsp.Reply}
}
}

// read body
err = codec.ReadBody(p.Body)
return perrors.WithStack(err)
}

type DubboCodec struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

singleton or not?

}

func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
invocation := request.Data.(invocation.RPCInvocation)
if request.Event {
fangyincheng marked this conversation as resolved.
Show resolved Hide resolved
return c.encodeHeartbeartReqeust(request)
}

invoc, ok := request.Data.(*protocol.Invocation)
if !ok {
logger.Errorf("encode request failed for parameter type :%+v", request)
fangyincheng marked this conversation as resolved.
Show resolved Hide resolved
return nil, perrors.Errorf("encode request failed for parameter type :%+v", request)
}
invocation := *invoc

p := &DubboPackage{}
p.Service.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "")
Expand Down Expand Up @@ -220,8 +140,24 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er

return bytes.NewBuffer(pkg), nil
}
func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) {
pkg := &DubboPackage{}
pkg.Body = []interface{}{}
pkg.Header.ID = request.Id
pkg.Header.Type = hessian.PackageHeartbeat
pkg.Header.SerialID = byte(S_Dubbo)

codec := hessian.NewHessianCodec(nil)

byt, err := codec.Write(pkg.Service, pkg.Header, pkg.Body)
if err != nil {
return nil, perrors.WithStack(err)
}

return bytes.NewBuffer(byt), nil
}
func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) {
var ptype hessian.PackageType = hessian.PackageResponse
var ptype = hessian.PackageResponse
if response.IsHeartbeat() {
ptype = hessian.PackageHeartbeat
}
Expand All @@ -233,7 +169,13 @@ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer,
ResponseStatus: response.Status,
},
}
resp.Body = response.Result
if !response.IsHeartbeat() {
resp.Body = &hessian.Response{
RspObj: response.Result.(protocol.RPCResult).Rest,
Exception: response.Result.(protocol.RPCResult).Err,
}
}

//if response.Header.Type&hessian.PackageRequest != 0x00 {
// resp.Body = req.Body
//} else {
Expand All @@ -259,7 +201,7 @@ func (c *DubboCodec) DecodeRequest(data []byte) (*remoting.Request, int, error)
originErr := perrors.Cause(err)
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
//FIXME
return request, 0, originErr
return request, 0, originErr
}
logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)

Expand All @@ -268,7 +210,8 @@ func (c *DubboCodec) DecodeRequest(data []byte) (*remoting.Request, int, error)
request = &remoting.Request{
Id: pkg.Header.ID,
SerialID: pkg.Header.SerialID,
TwoWay: false,
TwoWay: pkg.Header.Type&hessian.PackageRequest_TwoWay != 0x00,
Event: pkg.Header.Type&hessian.PackageHeartbeat != 0x00,
}
if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 {
// convert params of request
Expand Down Expand Up @@ -330,15 +273,16 @@ func (c *DubboCodec) DecodeRequest(data []byte) (*remoting.Request, int, error)
}
return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
}

func (c *DubboCodec) DecodeResponse(data []byte) (*remoting.Response, int, error) {
pkg := &DubboPackage{}
buf := bytes.NewBuffer(data)
var response *remoting.Response
response := &remoting.Response{}
err := pkg.Unmarshal(buf, response)
if err != nil {
originErr := perrors.Cause(err)
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
return response, 0, nil
return response, 0, nil
}
logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)

Expand All @@ -349,7 +293,7 @@ func (c *DubboCodec) DecodeResponse(data []byte) (*remoting.Response, int, error
//Version: pkg.Header.,
SerialID: pkg.Header.SerialID,
Status: pkg.Header.ResponseStatus,
Event: (pkg.Header.Type | hessian.PackageHeartbeat) != 0,
Event: (pkg.Header.Type & hessian.PackageHeartbeat) != 0,
}
var error error
if pkg.Header.Type&hessian.PackageHeartbeat != 0x00 {
Expand All @@ -368,6 +312,7 @@ func (c *DubboCodec) DecodeResponse(data []byte) (*remoting.Response, int, error
}
logger.Debugf("get rpc response{header: %#v, body: %#v}", pkg.Header, pkg.Body)
rpcResult := &protocol.RPCResult{}
response.Result = rpcResult
if pkg.Header.Type&hessian.PackageRequest == 0x00 {
if pkg.Err != nil {
rpcResult.Err = pkg.Err
Expand Down
36 changes: 30 additions & 6 deletions protocol/dubbo/dubbo_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ const (
DUBBO = "dubbo"
)

var (
exchangeClientMap *sync.Map = new(sync.Map)
)

func init() {
extension.SetProtocol(DUBBO, GetProtocol)
}
Expand Down Expand Up @@ -89,9 +93,7 @@ func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
// ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
// RequestTimeout: requestTimeout,
//}))
invoker := NewDubboInvoker(url, remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
}), config.GetConsumerConfig().ConnectTimeout))
invoker := NewDubboInvoker(url, getExchangeClient(url))
dp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
return invoker
Expand Down Expand Up @@ -124,7 +126,7 @@ func (dp *DubboProtocol) openServer(url common.URL) {
handler := func(invocation *invocation.RPCInvocation) protocol.RPCResult {
return doHandleRequest(invocation)
}
srv := remoting.NewExchangeServer(url, handler)
srv := remoting.NewExchangeServer(url, getty.NewServer(url, handler))
dp.serverMap[url.Location] = srv
srv.Start()
}
Expand Down Expand Up @@ -158,8 +160,8 @@ func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult
//
//args := p.Body.(map[string]interface{})["args"].([]interface{})
//inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments)

ctx := rebuildCtx(rpcInvocation)
// FIXME
ctx := getty.RebuildCtx(rpcInvocation)

invokeResult := invoker.Invoke(ctx, rpcInvocation)
if err := invokeResult.Error(); err != nil {
Expand All @@ -176,3 +178,25 @@ func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult
}
return result
}

func getExchangeClient(url common.URL) *remoting.ExchangeClient {
clientTmp, ok := exchangeClientMap.Load(url.Location)
if !ok {
exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
}), config.GetConsumerConfig().ConnectTimeout)
exchangeClientMap.Store(url.Location, exchangeClientTmp)

return exchangeClientTmp
}
exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
if !ok {
exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
}), config.GetConsumerConfig().ConnectTimeout)
exchangeClientMap.Store(url.Location, exchangeClientTmp)

return exchangeClientTmp
}
return exchangeClient
}
Loading