Skip to content

Commit

Permalink
Merge pull request apache#754 from georgehao/refact-seri
Browse files Browse the repository at this point in the history
fix: resove dev RPCResult struct change
  • Loading branch information
AlexStocks authored Sep 13, 2020
2 parents 6a60534 + 8e8eff4 commit aaba8a4
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 34 deletions.
4 changes: 2 additions & 2 deletions protocol/dubbo/dubbo_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error)
//invocation := request.Data.(*invocation.RPCInvocation)
var methodName string
var args []interface{}
var attachments map[string]string = make(map[string]string)
attachments := make(map[string]interface{})
if req[impl.DubboVersionKey] != nil {
//dubbo version
request.Version = req[impl.DubboVersionKey].(string)
Expand All @@ -225,7 +225,7 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error)
//method
methodName = pkg.Service.Method
args = req[impl.ArgsKey].([]interface{})
attachments = req[impl.AttachmentsKey].(map[string]string)
attachments = req[impl.AttachmentsKey].(map[string]interface{})
invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments),
invocation.WithArguments(args), invocation.WithMethodName(methodName))
request.Data = invoc
Expand Down
9 changes: 8 additions & 1 deletion protocol/dubbo/impl/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,12 @@ func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) {
assert.Equal(t, "Method", pkgres.Service.Method)
assert.Equal(t, "Ljava/lang/String;", reassembleBody["argsTypes"].(string))
assert.Equal(t, []interface{}{"a"}, reassembleBody["args"])
assert.Equal(t, map[string]string{"dubbo": "2.0.2", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, reassembleBody["attachments"].(map[string]string))
tmpData := map[string]interface{}{
"dubbo": "2.0.2",
"interface": "Service",
"path": "path",
"timeout": "1000",
"version": "2.6",
}
assert.Equal(t, tmpData, reassembleBody["attachments"])
}
42 changes: 30 additions & 12 deletions protocol/dubbo/impl/hessian.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ func marshalResponse(encoder *hessian.Encoder, p DubboPackage) ([]byte, error) {
if p.IsHeartBeat() {
encoder.Encode(nil)
} else {
atta := isSupportResponseAttachment(response.Attachments[DUBBO_VERSION_KEY])
var version string
if attachmentVersion, ok := response.Attachments[DUBBO_VERSION_KEY]; ok {
version = attachmentVersion.(string)
}
atta := isSupportResponseAttachment(version)

var resWithException, resValue, resNullValue int32
if atta {
Expand Down Expand Up @@ -255,7 +259,7 @@ func unmarshalRequestBody(body []byte, p *DubboPackage) error {

if v, ok := attachments.(map[interface{}]interface{}); ok {
v[DUBBO_VERSION_KEY] = dubboVersion
req[6] = hessian.ToMapStringString(v)
req[6] = ToMapStringInterface(v)
buildServerSidePackageBody(p)
return nil
}
Expand Down Expand Up @@ -285,7 +289,7 @@ func unmarshalResponseBody(body []byte, p *DubboPackage) error {
return perrors.WithStack(err)
}
if v, ok := attachments.(map[interface{}]interface{}); ok {
atta := hessian.ToMapStringString(v)
atta := ToMapStringInterface(v)
response.Attachments = atta
} else {
return perrors.Errorf("get wrong attachments: %+v", attachments)
Expand All @@ -310,7 +314,7 @@ func unmarshalResponseBody(body []byte, p *DubboPackage) error {
return perrors.WithStack(err)
}
if v, ok := attachments.(map[interface{}]interface{}); ok {
atta := hessian.ToMapStringString(v)
atta := ToMapStringInterface(v)
response.Attachments = atta
} else {
return perrors.Errorf("get wrong attachments: %+v", attachments)
Expand All @@ -326,7 +330,7 @@ func unmarshalResponseBody(body []byte, p *DubboPackage) error {
return perrors.WithStack(err)
}
if v, ok := attachments.(map[interface{}]interface{}); ok {
atta := hessian.ToMapStringString(v)
atta := ToMapStringInterface(v)
response.Attachments = atta
} else {
return perrors.Errorf("get wrong attachments: %+v", attachments)
Expand All @@ -342,7 +346,7 @@ func buildServerSidePackageBody(pkg *DubboPackage) {
if len(req) > 0 {
var dubboVersion, argsTypes string
var args []interface{}
var attachments map[string]string
var attachments map[string]interface{}
svc := Service{}
if req[0] != nil {
dubboVersion = req[0].(string)
Expand All @@ -363,18 +367,18 @@ func buildServerSidePackageBody(pkg *DubboPackage) {
args = req[5].([]interface{})
}
if req[6] != nil {
attachments = req[6].(map[string]string)
attachments = req[6].(map[string]interface{})
}
if svc.Path == "" && len(attachments[constant.PATH_KEY]) > 0 {
svc.Path = attachments[constant.PATH_KEY]
if svc.Path == "" && attachments[constant.PATH_KEY] != nil && len(attachments[constant.PATH_KEY].(string)) > 0 {
svc.Path = attachments[constant.PATH_KEY].(string)
}
if _, ok := attachments[constant.INTERFACE_KEY]; ok {
svc.Interface = attachments[constant.INTERFACE_KEY]
svc.Interface = attachments[constant.INTERFACE_KEY].(string)
} else {
svc.Interface = svc.Path
}
if len(attachments[constant.GROUP_KEY]) > 0 {
svc.Group = attachments[constant.GROUP_KEY]
if _, ok := attachments[constant.GROUP_KEY]; ok {
svc.Group = attachments[constant.GROUP_KEY].(string)
}
pkg.SetService(svc)
pkg.SetBody(map[string]interface{}{
Expand Down Expand Up @@ -503,6 +507,20 @@ func getArgType(v interface{}) string {
// return "java.lang.RuntimeException"
}

func ToMapStringInterface(origin map[interface{}]interface{}) map[string]interface{} {
dest := make(map[string]interface{}, len(origin))
for k, v := range origin {
if kv, ok := k.(string); ok {
if v == nil {
dest[kv] = ""
continue
}
dest[kv] = v
}
}
return dest
}

func init() {
SetSerializer("hessian2", HessianSerializer{})
}
6 changes: 3 additions & 3 deletions protocol/dubbo/impl/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package impl

type RequestPayload struct {
Params interface{}
Attachments map[string]string
Attachments map[string]interface{}
}

func NewRequestPayload(args interface{}, atta map[string]string) *RequestPayload {
func NewRequestPayload(args interface{}, atta map[string]interface{}) *RequestPayload {
if atta == nil {
atta = make(map[string]string)
atta = make(map[string]interface{})
}
return &RequestPayload{
Params: args,
Expand Down
6 changes: 3 additions & 3 deletions protocol/dubbo/impl/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package impl
type ResponsePayload struct {
RspObj interface{}
Exception error
Attachments map[string]string
Attachments map[string]interface{}
}

// NewResponse create a new ResponsePayload
func NewResponsePayload(rspObj interface{}, exception error, attachments map[string]string) *ResponsePayload {
func NewResponsePayload(rspObj interface{}, exception error, attachments map[string]interface{}) *ResponsePayload {
if attachments == nil {
attachments = make(map[string]string)
attachments = make(map[string]interface{})
}
return &ResponsePayload{
RspObj: rspObj,
Expand Down
4 changes: 2 additions & 2 deletions remoting/getty/dubbo_codec_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (c *DubboTestCodec) decodeRequest(data []byte) (*remoting.Request, int, err
//invocation := request.Data.(*invocation.RPCInvocation)
var methodName string
var args []interface{}
var attachments map[string]string = make(map[string]string)
attachments := make(map[string]interface{})
if req[impl.DubboVersionKey] != nil {
//dubbo version
request.Version = req[impl.DubboVersionKey].(string)
Expand All @@ -217,7 +217,7 @@ func (c *DubboTestCodec) decodeRequest(data []byte) (*remoting.Request, int, err
//method
methodName = pkg.Service.Method
args = req[impl.ArgsKey].([]interface{})
attachments = req[impl.AttachmentsKey].(map[string]string)
attachments = req[impl.AttachmentsKey].(map[string]interface{})
invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments),
invocation.WithArguments(args), invocation.WithMethodName(methodName))
request.Data = invoc
Expand Down
14 changes: 3 additions & 11 deletions remoting/getty/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func TestRebuildCtx(t *testing.T) {

span, ctx := opentracing.StartSpanFromContext(ctx, "Test-Client")

injectTraceCtx(span, inv)
err := injectTraceCtx(span, inv)
assert.NoError(t, err)

// rebuild the context success
inv = invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)
ctx = rebuildCtx(inv)
Expand All @@ -71,13 +73,3 @@ func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
}
return ctx
}

func filterContext(attachments map[string]interface{}) map[string]string {
var traceAttchment = make(map[string]string)
for k, v := range attachments {
if r, ok := v.(string); ok {
traceAttchment[k] = r
}
}
return traceAttchment
}
60 changes: 60 additions & 0 deletions remoting/getty/opentracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package getty

import (
"github.com/opentracing/opentracing-go"
)
import (
invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
)

func injectTraceCtx(currentSpan opentracing.Span, inv *invocation_impl.RPCInvocation) error {
// inject opentracing ctx
traceAttachments := filterContext(inv.Attachments())
carrier := opentracing.TextMapCarrier(traceAttachments)
err := opentracing.GlobalTracer().Inject(currentSpan.Context(), opentracing.TextMap, carrier)
if err == nil {
fillTraceAttachments(inv.Attachments(), traceAttachments)
}
return err
}

func extractTraceCtx(inv *invocation_impl.RPCInvocation) (opentracing.SpanContext, error) {
traceAttachments := filterContext(inv.Attachments())
// actually, if user do not use any opentracing framework, the err will not be nil.
spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
opentracing.TextMapCarrier(traceAttachments))
return spanCtx, err
}

func filterContext(attachments map[string]interface{}) map[string]string {
var traceAttchment = make(map[string]string)
for k, v := range attachments {
if r, ok := v.(string); ok {
traceAttchment[k] = r
}
}
return traceAttchment
}

func fillTraceAttachments(attachments map[string]interface{}, traceAttachment map[string]string) {
for k, v := range traceAttachment {
attachments[k] = v
}
}

0 comments on commit aaba8a4

Please sign in to comment.