Skip to content

Commit

Permalink
Transfer context for dubbo protocol by attachment
Browse files Browse the repository at this point in the history
  • Loading branch information
flycash committed Jan 30, 2020
1 parent df79a5c commit a283aeb
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 15 deletions.
2 changes: 1 addition & 1 deletion common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const (

const (
DUBBOGO_CTX_KEY = "dubbogo-ctx"
CONTEXT_KEY = "context"
CONTEXT_KEY = "context"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (di *DubboInvoker) appendCtx(ctx context.Context, inv *invocation_impl.RPCI
// inject opentracing ctx
currentSpan := opentracing.SpanFromContext(ctx)
if currentSpan != nil {
carrier:= opentracing.TextMapCarrier(inv.Attachments())
carrier := opentracing.TextMapCarrier(inv.Attachments())
err := opentracing.GlobalTracer().Inject(currentSpan.Context(), opentracing.TextMap, carrier)
if err != nil {
logger.Errorf("Could not inject the span context into attachments: %v", err)
Expand Down
28 changes: 15 additions & 13 deletions protocol/dubbo/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
)

import (
"github.com/apache/dubbo-go-hessian2"
"github.com/dubbogo/getty"
"github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -65,9 +64,9 @@ func (s *rpcSession) GetReqNum() int32 {
return atomic.LoadInt32(&s.reqNum)
}

////////////////////////////////////////////
// //////////////////////////////////////////
// RpcClientHandler
////////////////////////////////////////////
// //////////////////////////////////////////

// RpcClientHandler ...
type RpcClientHandler struct {
Expand Down Expand Up @@ -159,9 +158,9 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
h.conn.pool.rpcClient.heartbeat(session)
}

////////////////////////////////////////////
// //////////////////////////////////////////
// RpcServerHandler
////////////////////////////////////////////
// //////////////////////////////////////////

// RpcServerHandler ...
type RpcServerHandler struct {
Expand Down Expand Up @@ -287,7 +286,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
args := p.Body.(map[string]interface{})["args"].([]interface{})
inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments)

ctx := h.rebuildCtx(inv)
ctx := rebuildCtx(inv)

result := invoker.Invoke(ctx, inv)
if err := result.Error(); err != nil {
Expand Down Expand Up @@ -332,17 +331,20 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
}
}

func (h *RpcServerHandler) rebuildCtx(inv *invocation.RPCInvocation) interface{} {
// rebuild the context by attachment.
// Once we decided to transfer more context's key-value, we should change this.
// now we only support rebuild the tracing context
func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
ctx := context.Background()

// actually, if user do not use any opentracing framework, it will always be error.
spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
opentracing.TextMapCarrier(inv.Attachments()))
if err != nil {
logger.Errorf("Could not extract the span context: %v", err)
}

if spanCtx != nil {
ctx =
if err == nil {
ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
}
return ctx
}

func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
Expand Down
58 changes: 58 additions & 0 deletions protocol/dubbo/listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dubbo

import (
"testing"
)

import (
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol/invocation"
)

// test rebuild the ctx
func TestRebuildCtx(t *testing.T) {
opentracing.SetGlobalTracer(mocktracer.New())
attach := make(map[string]string, 10)
attach[constant.VERSION_KEY] = "1.0"
attach[constant.GROUP_KEY] = "MyGroup"
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)

// attachment doesn't contains any tracing key-value pair,
ctx := rebuildCtx(inv)
assert.NotNil(t, ctx)
assert.Nil(t, ctx.Value(constant.TRACING_REMOTE_SPAN_CTX))

span, ctx := opentracing.StartSpanFromContext(ctx, "Test-Client")

opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap,
opentracing.TextMapCarrier(inv.Attachments()))
// rebuild the context success
inv = invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)
ctx = rebuildCtx(inv)
span.Finish()
assert.NotNil(t, ctx)
assert.NotNil(t, ctx.Value(constant.TRACING_REMOTE_SPAN_CTX))
}

0 comments on commit a283aeb

Please sign in to comment.