From 7f80eaebf4302c3a9aab14e9b5865740807f91f5 Mon Sep 17 00:00:00 2001 From: haohongfan Date: Sat, 12 Sep 2020 23:59:07 +0800 Subject: [PATCH 1/2] fix: resove dev RPCResult struct change --- protocol/dubbo/dubbo_codec.go | 4 +-- protocol/dubbo/impl/codec_test.go | 9 +++++- protocol/dubbo/impl/hessian.go | 42 ++++++++++++++++++-------- protocol/dubbo/impl/request.go | 6 ++-- protocol/dubbo/impl/response.go | 6 ++-- remoting/getty/dubbo_codec_for_test.go | 4 +-- 6 files changed, 48 insertions(+), 23 deletions(-) diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go index 673588ef24..b84952a125 100644 --- a/protocol/dubbo/dubbo_codec.go +++ b/protocol/dubbo/dubbo_codec.go @@ -213,7 +213,7 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) //invocation := request.Data.(*invocation.RPCInvocation) var methodName string var args []interface{} - var attachments map[string]string = make(map[string]string) + attachments := make(map[string]interface{}) if req[impl.DubboVersionKey] != nil { //dubbo version request.Version = req[impl.DubboVersionKey].(string) @@ -225,7 +225,7 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) //method methodName = pkg.Service.Method args = req[impl.ArgsKey].([]interface{}) - attachments = req[impl.AttachmentsKey].(map[string]string) + attachments = req[impl.AttachmentsKey].(map[string]interface{}) invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments), invocation.WithArguments(args), invocation.WithMethodName(methodName)) request.Data = invoc diff --git a/protocol/dubbo/impl/codec_test.go b/protocol/dubbo/impl/codec_test.go index 03e768dacd..1c37928238 100644 --- a/protocol/dubbo/impl/codec_test.go +++ b/protocol/dubbo/impl/codec_test.go @@ -78,5 +78,12 @@ func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) { assert.Equal(t, "Method", pkgres.Service.Method) assert.Equal(t, "Ljava/lang/String;", reassembleBody["argsTypes"].(string)) assert.Equal(t, []interface{}{"a"}, reassembleBody["args"]) - assert.Equal(t, map[string]string{"dubbo": "2.0.2", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, reassembleBody["attachments"].(map[string]string)) + tmpData := map[string]interface{}{ + "dubbo": "2.0.2", + "interface": "Service", + "path": "path", + "timeout": "1000", + "version": "2.6", + } + assert.Equal(t, tmpData, reassembleBody["attachments"]) } diff --git a/protocol/dubbo/impl/hessian.go b/protocol/dubbo/impl/hessian.go index 513421b95f..b686d5728d 100644 --- a/protocol/dubbo/impl/hessian.go +++ b/protocol/dubbo/impl/hessian.go @@ -67,7 +67,11 @@ func marshalResponse(encoder *hessian.Encoder, p DubboPackage) ([]byte, error) { if p.IsHeartBeat() { encoder.Encode(nil) } else { - atta := isSupportResponseAttachment(response.Attachments[DUBBO_VERSION_KEY]) + var version string + if attachmentVersion, ok := response.Attachments[DUBBO_VERSION_KEY]; ok { + version = attachmentVersion.(string) + } + atta := isSupportResponseAttachment(version) var resWithException, resValue, resNullValue int32 if atta { @@ -255,7 +259,7 @@ func unmarshalRequestBody(body []byte, p *DubboPackage) error { if v, ok := attachments.(map[interface{}]interface{}); ok { v[DUBBO_VERSION_KEY] = dubboVersion - req[6] = hessian.ToMapStringString(v) + req[6] = ToMapStringInterface(v) buildServerSidePackageBody(p) return nil } @@ -285,7 +289,7 @@ func unmarshalResponseBody(body []byte, p *DubboPackage) error { return perrors.WithStack(err) } if v, ok := attachments.(map[interface{}]interface{}); ok { - atta := hessian.ToMapStringString(v) + atta := ToMapStringInterface(v) response.Attachments = atta } else { return perrors.Errorf("get wrong attachments: %+v", attachments) @@ -310,7 +314,7 @@ func unmarshalResponseBody(body []byte, p *DubboPackage) error { return perrors.WithStack(err) } if v, ok := attachments.(map[interface{}]interface{}); ok { - atta := hessian.ToMapStringString(v) + atta := ToMapStringInterface(v) response.Attachments = atta } else { return perrors.Errorf("get wrong attachments: %+v", attachments) @@ -326,7 +330,7 @@ func unmarshalResponseBody(body []byte, p *DubboPackage) error { return perrors.WithStack(err) } if v, ok := attachments.(map[interface{}]interface{}); ok { - atta := hessian.ToMapStringString(v) + atta := ToMapStringInterface(v) response.Attachments = atta } else { return perrors.Errorf("get wrong attachments: %+v", attachments) @@ -342,7 +346,7 @@ func buildServerSidePackageBody(pkg *DubboPackage) { if len(req) > 0 { var dubboVersion, argsTypes string var args []interface{} - var attachments map[string]string + var attachments map[string]interface{} svc := Service{} if req[0] != nil { dubboVersion = req[0].(string) @@ -363,18 +367,18 @@ func buildServerSidePackageBody(pkg *DubboPackage) { args = req[5].([]interface{}) } if req[6] != nil { - attachments = req[6].(map[string]string) + attachments = req[6].(map[string]interface{}) } - if svc.Path == "" && len(attachments[constant.PATH_KEY]) > 0 { - svc.Path = attachments[constant.PATH_KEY] + if svc.Path == "" && attachments[constant.PATH_KEY] != nil && len(attachments[constant.PATH_KEY].(string)) > 0 { + svc.Path = attachments[constant.PATH_KEY].(string) } if _, ok := attachments[constant.INTERFACE_KEY]; ok { - svc.Interface = attachments[constant.INTERFACE_KEY] + svc.Interface = attachments[constant.INTERFACE_KEY].(string) } else { svc.Interface = svc.Path } - if len(attachments[constant.GROUP_KEY]) > 0 { - svc.Group = attachments[constant.GROUP_KEY] + if _, ok := attachments[constant.GROUP_KEY]; ok { + svc.Group = attachments[constant.GROUP_KEY].(string) } pkg.SetService(svc) pkg.SetBody(map[string]interface{}{ @@ -503,6 +507,20 @@ func getArgType(v interface{}) string { // return "java.lang.RuntimeException" } +func ToMapStringInterface(origin map[interface{}]interface{}) map[string]interface{} { + dest := make(map[string]interface{}, len(origin)) + for k, v := range origin { + if kv, ok := k.(string); ok { + if v == nil { + dest[kv] = "" + continue + } + dest[kv] = v + } + } + return dest +} + func init() { SetSerializer("hessian2", HessianSerializer{}) } diff --git a/protocol/dubbo/impl/request.go b/protocol/dubbo/impl/request.go index 0e770c3afb..ef520083e6 100644 --- a/protocol/dubbo/impl/request.go +++ b/protocol/dubbo/impl/request.go @@ -19,12 +19,12 @@ package impl type RequestPayload struct { Params interface{} - Attachments map[string]string + Attachments map[string]interface{} } -func NewRequestPayload(args interface{}, atta map[string]string) *RequestPayload { +func NewRequestPayload(args interface{}, atta map[string]interface{}) *RequestPayload { if atta == nil { - atta = make(map[string]string) + atta = make(map[string]interface{}) } return &RequestPayload{ Params: args, diff --git a/protocol/dubbo/impl/response.go b/protocol/dubbo/impl/response.go index ea0a6efb23..9fde1eb249 100644 --- a/protocol/dubbo/impl/response.go +++ b/protocol/dubbo/impl/response.go @@ -20,13 +20,13 @@ package impl type ResponsePayload struct { RspObj interface{} Exception error - Attachments map[string]string + Attachments map[string]interface{} } // NewResponse create a new ResponsePayload -func NewResponsePayload(rspObj interface{}, exception error, attachments map[string]string) *ResponsePayload { +func NewResponsePayload(rspObj interface{}, exception error, attachments map[string]interface{}) *ResponsePayload { if attachments == nil { - attachments = make(map[string]string) + attachments = make(map[string]interface{}) } return &ResponsePayload{ RspObj: rspObj, diff --git a/remoting/getty/dubbo_codec_for_test.go b/remoting/getty/dubbo_codec_for_test.go index bde7d9e696..b91fc9f4cc 100644 --- a/remoting/getty/dubbo_codec_for_test.go +++ b/remoting/getty/dubbo_codec_for_test.go @@ -205,7 +205,7 @@ func (c *DubboTestCodec) decodeRequest(data []byte) (*remoting.Request, int, err //invocation := request.Data.(*invocation.RPCInvocation) var methodName string var args []interface{} - var attachments map[string]string = make(map[string]string) + attachments := make(map[string]interface{}) if req[impl.DubboVersionKey] != nil { //dubbo version request.Version = req[impl.DubboVersionKey].(string) @@ -217,7 +217,7 @@ func (c *DubboTestCodec) decodeRequest(data []byte) (*remoting.Request, int, err //method methodName = pkg.Service.Method args = req[impl.ArgsKey].([]interface{}) - attachments = req[impl.AttachmentsKey].(map[string]string) + attachments = req[impl.AttachmentsKey].(map[string]interface{}) invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments), invocation.WithArguments(args), invocation.WithMethodName(methodName)) request.Data = invoc From 8e8eff4f859f24ebd12a3ce4eaa30b5d87e02939 Mon Sep 17 00:00:00 2001 From: haohongfan Date: Sun, 13 Sep 2020 15:56:27 +0800 Subject: [PATCH 2/2] fix: add injectTraceCtx to getty --- remoting/getty/listener_test.go | 14 ++------ remoting/getty/opentracing.go | 60 +++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 11 deletions(-) create mode 100644 remoting/getty/opentracing.go diff --git a/remoting/getty/listener_test.go b/remoting/getty/listener_test.go index 7a54323d19..7e7ac5fed4 100644 --- a/remoting/getty/listener_test.go +++ b/remoting/getty/listener_test.go @@ -48,7 +48,9 @@ func TestRebuildCtx(t *testing.T) { span, ctx := opentracing.StartSpanFromContext(ctx, "Test-Client") - injectTraceCtx(span, inv) + err := injectTraceCtx(span, inv) + assert.NoError(t, err) + // rebuild the context success inv = invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) ctx = rebuildCtx(inv) @@ -71,13 +73,3 @@ func rebuildCtx(inv *invocation.RPCInvocation) context.Context { } return ctx } - -func filterContext(attachments map[string]interface{}) map[string]string { - var traceAttchment = make(map[string]string) - for k, v := range attachments { - if r, ok := v.(string); ok { - traceAttchment[k] = r - } - } - return traceAttchment -} diff --git a/remoting/getty/opentracing.go b/remoting/getty/opentracing.go new file mode 100644 index 0000000000..7db733cbe9 --- /dev/null +++ b/remoting/getty/opentracing.go @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package getty + +import ( + "github.com/opentracing/opentracing-go" +) +import ( + invocation_impl "github.com/apache/dubbo-go/protocol/invocation" +) + +func injectTraceCtx(currentSpan opentracing.Span, inv *invocation_impl.RPCInvocation) error { + // inject opentracing ctx + traceAttachments := filterContext(inv.Attachments()) + carrier := opentracing.TextMapCarrier(traceAttachments) + err := opentracing.GlobalTracer().Inject(currentSpan.Context(), opentracing.TextMap, carrier) + if err == nil { + fillTraceAttachments(inv.Attachments(), traceAttachments) + } + return err +} + +func extractTraceCtx(inv *invocation_impl.RPCInvocation) (opentracing.SpanContext, error) { + traceAttachments := filterContext(inv.Attachments()) + // actually, if user do not use any opentracing framework, the err will not be nil. + spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, + opentracing.TextMapCarrier(traceAttachments)) + return spanCtx, err +} + +func filterContext(attachments map[string]interface{}) map[string]string { + var traceAttchment = make(map[string]string) + for k, v := range attachments { + if r, ok := v.(string); ok { + traceAttchment[k] = r + } + } + return traceAttchment +} + +func fillTraceAttachments(attachments map[string]interface{}, traceAttachment map[string]string) { + for k, v := range traceAttachment { + attachments[k] = v + } +}