Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sort out triple logic and fix comments #2454

Merged
Prev Previous commit
Next Next commit
add dubbo3 client stub code compatibility and default timeout for una…
…ry call
  • Loading branch information
DMwangnima committed Nov 1, 2023
commit 41d39e605197343090bc377593558e6e41f2eec9
1 change: 1 addition & 0 deletions client/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {
common.WithParams(opts.getURLMap()),
common.WithParamsValue(constant.BeanNameKey, opts.id),
common.WithParamsValue(constant.MetadataTypeKey, opts.metaDataType),
common.WithAttribute(constant.ClientInfoKey, info),
)

if ref.ForceTag {
Expand Down
1 change: 1 addition & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ const (
CallHTTP = "http"
CallHTTP2 = "http2"
ServiceInfoKey = "service-info"
ClientInfoKey = "client-info"
)

const (
Expand Down
16 changes: 4 additions & 12 deletions protocol/triple/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,14 @@ func newClientManager(url *common.URL) (*clientManager, error) {
panic(fmt.Sprintf("Unsupported serialization: %s", serialization))
}

// todo:// process timeout
// consumer config client connectTimeout
//connectTimeout := config.GetConsumerConfig().ConnectTimeout
// set timeout
timeout := url.GetParamDuration(constant.TimeoutKey, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand it right, this url contains the complete configuration of a specific service? I think we need to call something like below to get the method level timeout configuration:

timeout := url.GetMethodParamDuration(constant.TimeoutKey, "")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the default timeout value for (constant.TimeoutKey, "")?

Copy link
Contributor

@chickenlj chickenlj Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized something. Are we in the middle of an RPC invoke here? If not, we might not be able to get the method name needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specific method level timeout (optional) passed in with ctx in TripleInvoker.invoke(ctx), then to clientManager.callUnary/callClientStream(). If timeout presents in ctx, then it will be used as the timeout value; If not, the default value calculated here by timeout := url.GetParamDuration(constant.TimeoutKey, "") will be used.

Is the above process right?

Copy link
Contributor Author

@DMwangnima DMwangnima Nov 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specific method level timeout (optional) passed in with ctx in TripleInvoker.invoke(ctx), then to clientManager.callUnary/callClientStream(). If timeout presents in ctx, then it will be used as the timeout value; If not, the default value calculated here by timeout := url.GetParamDuration(constant.TimeoutKey, "") will be used.

Is the above process right?

Yes, that is. The ctx used by users would have highest priority. It is more ideal for uses to use ctx to specify timeout for method level since they can use cancel function directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized something. Are we in the middle of an RPC invoke here? If not, we might not be able to get the method name needed.

We are in the process of Refer, i.e. initialize client and some general configurations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specific method level timeout (optional) passed in with ctx in TripleInvoker.invoke(ctx), then to clientManager.callUnary/callClientStream(). If timeout presents in ctx, then it will be used as the timeout value; If not, the default value calculated here by timeout := url.GetParamDuration(constant.TimeoutKey, "") will be used.
Is the above process right?

Yes, that is. The ctx used by users would have highest priority. It is more ideal for uses to use ctx to specify timeout for method level since they can use cancel function directly.

I see, this sounds reasonable to me.

One more thing, we need to check the method level timeout if ctx is not set, then put method timeout into ctx.

triClientOpts = append(triClientOpts, tri.WithTimeout(timeout))

// dialOpts = append(dialOpts,
//
// grpc.WithBlock(),
// // todo config network timeout
// // todo config tracing
// grpc.WithTimeout(time.Second*3),
// grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.LogPayloads())),
// grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer, otgrpc.LogPayloads())),
Expand Down Expand Up @@ -180,14 +180,6 @@ func newClientManager(url *common.URL) (*clientManager, error) {
// tlsFlag = true
//}

// todo(DMwangnima): this code fragment would be used to be compatible with old triple client
//key := url.GetParam(constant.InterfaceKey, "")
//conRefs := config.GetConsumerConfig().References
//ref, ok := conRefs[key]
//if !ok {
// panic("no reference")
//}
// todo: set timeout
var transport http.RoundTripper
callType := url.GetParam(constant.CallHTTPTypeKey, constant.CallHTTP2)
switch callType {
Expand Down
271 changes: 271 additions & 0 deletions protocol/triple/dubbo3_invoker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package triple

import (
"context"
"reflect"
"strconv"
"strings"
"sync"
"time"
)

import (
"github.com/dubbogo/gost/log/logger"

"github.com/dubbogo/grpc-go/metadata"

tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
triConfig "github.com/dubbogo/triple/pkg/config"
"github.com/dubbogo/triple/pkg/triple"

"github.com/dustin/go-humanize"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)

// same as dubbo_invoker.go attachmentKey
var attachmentKey = []string{
constant.InterfaceKey, constant.GroupKey, constant.TokenKey, constant.TimeoutKey,
constant.VersionKey, tripleConstant.TripleServiceGroup, tripleConstant.TripleServiceVersion,
}

// DubboInvoker is implement of protocol.Invoker, a dubboInvoker refer to one service and ip.
type DubboInvoker struct {
protocol.BaseInvoker
// the net layer client, it is focus on network communication.
client *triple.TripleClient
// quitOnce is used to make sure DubboInvoker is only destroyed once
quitOnce sync.Once
// timeout for service(interface) level.
timeout time.Duration
// clientGuard is the client lock of dubbo invoker
clientGuard *sync.RWMutex
}

// NewDubbo3Invoker constructor
func NewDubbo3Invoker(url *common.URL) (*DubboInvoker, error) {
rt := config.GetConsumerConfig().RequestTimeout

timeout := url.GetParamDuration(constant.TimeoutKey, rt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also consider method timeout.

// for triple pb serialization. The bean name from provider is the provider reference key,
// which can't locate the target consumer stub, so we use interface key..
interfaceKey := url.GetParam(constant.InterfaceKey, "")
consumerService := config.GetConsumerServiceByInterfaceName(interfaceKey)

dubboSerializerType := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization)
triCodecType := tripleConstant.CodecType(dubboSerializerType)
// new triple client
opts := []triConfig.OptionFunction{
triConfig.WithClientTimeout(timeout),
triConfig.WithCodecType(triCodecType),
triConfig.WithLocation(url.Location),
triConfig.WithHeaderAppVersion(url.GetParam(constant.AppVersionKey, "")),
triConfig.WithHeaderGroup(url.GetParam(constant.GroupKey, "")),
triConfig.WithLogger(logger.GetLogger()),
}
maxCallRecvMsgSize := constant.DefaultMaxCallRecvMsgSize
if maxCall, err := humanize.ParseBytes(url.GetParam(constant.MaxCallRecvMsgSize, "")); err == nil && maxCall != 0 {
maxCallRecvMsgSize = int(maxCall)
}
maxCallSendMsgSize := constant.DefaultMaxCallSendMsgSize
if maxCall, err := humanize.ParseBytes(url.GetParam(constant.MaxCallSendMsgSize, "")); err == nil && maxCall != 0 {
maxCallSendMsgSize = int(maxCall)
}
opts = append(opts, triConfig.WithGRPCMaxCallRecvMessageSize(maxCallRecvMsgSize))
opts = append(opts, triConfig.WithGRPCMaxCallSendMessageSize(maxCallSendMsgSize))

tracingKey := url.GetParam(constant.TracingConfigKey, "")
if tracingKey != "" {
tracingConfig := config.GetTracingConfig(tracingKey)
if tracingConfig != nil {
if tracingConfig.Name == "jaeger" {
if tracingConfig.ServiceName == "" {
tracingConfig.ServiceName = config.GetApplicationConfig().Name
}
opts = append(opts, triConfig.WithJaegerConfig(
tracingConfig.Address,
tracingConfig.ServiceName,
*tracingConfig.UseAgent,
))
} else {
logger.Warnf("unsupported tracing name %s, now triple only support jaeger", tracingConfig.Name)
}
}
}

triOption := triConfig.NewTripleOption(opts...)
tlsConfig := config.GetRootConfig().TLSConfig
if tlsConfig != nil {
triOption.TLSCertFile = tlsConfig.TLSCertFile
triOption.TLSKeyFile = tlsConfig.TLSKeyFile
triOption.CACertFile = tlsConfig.CACertFile
triOption.TLSServerName = tlsConfig.TLSServerName
logger.Infof("Triple Client initialized the TLSConfig configuration")
}
client, err := triple.NewTripleClient(consumerService, triOption)

if err != nil {
return nil, err
}

return &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
client: client,
timeout: timeout,
clientGuard: &sync.RWMutex{},
}, nil
}

func (di *DubboInvoker) setClient(client *triple.TripleClient) {
di.clientGuard.Lock()
defer di.clientGuard.Unlock()

di.client = client
}

func (di *DubboInvoker) getClient() *triple.TripleClient {
di.clientGuard.RLock()
defer di.clientGuard.RUnlock()

return di.client
}

// Invoke call remoting.
func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.RPCResult
)

if !di.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroyed")
result.Err = protocol.ErrDestroyedInvoker
return &result
}

di.clientGuard.RLock()
defer di.clientGuard.RUnlock()

if di.client == nil {
result.Err = protocol.ErrClientClosed
return &result
}

if !di.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this grpcInvoker is destroying")
result.Err = protocol.ErrDestroyedInvoker
return &result
}

for _, k := range attachmentKey {
var paramKey string
switch k {
case tripleConstant.TripleServiceGroup:
paramKey = constant.GroupKey
case tripleConstant.TripleServiceVersion:
paramKey = constant.VersionKey
default:
paramKey = k
}

if v := di.GetURL().GetParam(paramKey, ""); len(v) > 0 {
invocation.SetAttachment(k, v)
}
}

// append interface id to ctx
gRPCMD := make(metadata.MD, 0)
// triple will convert attachment value to []string
for k, v := range invocation.Attachments() {
if str, ok := v.(string); ok {
gRPCMD.Set(k, str)
continue
}
if str, ok := v.([]string); ok {
gRPCMD.Set(k, str...)
continue
}
logger.Warnf("[Triple Protocol]Triple attachment value with key = %s is invalid, which should be string or []string", k)
}
ctx = metadata.NewOutgoingContext(ctx, gRPCMD)
ctx = context.WithValue(ctx, tripleConstant.InterfaceKey, di.BaseInvoker.GetURL().GetParam(constant.InterfaceKey, ""))
in := make([]reflect.Value, 0, 16)
in = append(in, reflect.ValueOf(ctx))

if len(invocation.ParameterValues()) > 0 {
in = append(in, invocation.ParameterValues()...)
}

methodName := invocation.MethodName()
triAttachmentWithErr := di.client.Invoke(methodName, in, invocation.Reply())
result.Err = triAttachmentWithErr.GetError()
result.Attrs = make(map[string]interface{})
for k, v := range triAttachmentWithErr.GetAttachments() {
result.Attrs[k] = v
}
result.Rest = invocation.Reply()
return &result
}

// get timeout including methodConfig
func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) time.Duration {
timeout := di.GetURL().GetParam(strings.Join([]string{constant.MethodKeys, invocation.MethodName(), constant.TimeoutKey}, "."), "")
if len(timeout) != 0 {
if t, err := time.ParseDuration(timeout); err == nil {
// config timeout into attachment
invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
return t
}
}
// set timeout into invocation at method level
invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
return di.timeout
}

// IsAvailable check if invoker is available, now it is useless
func (di *DubboInvoker) IsAvailable() bool {
client := di.getClient()
if client != nil {
// FIXME here can't check if tcp server is started now!!!
return client.IsAvailable()
}
return false
}

// Destroy destroy dubbo3 client invoker.
func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
di.BaseInvoker.Destroy()
client := di.getClient()
if client != nil {
di.setClient(nil)
client.Close()
}
})
}
12 changes: 11 additions & 1 deletion protocol/triple/triple.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,17 @@ func (tp *TripleProtocol) openServer(invoker protocol.Invoker, info *server.Serv

// Refer a remote triple service
func (tp *TripleProtocol) Refer(url *common.URL) protocol.Invoker {
invoker, err := NewTripleInvoker(url)
var invoker protocol.Invoker
var err error
// for now, we do not need to use this info
_, ok := url.Attributes[constant.ClientInfoKey]
if ok {
// stub code generated by new protoc-gen-go-triple
invoker, err = NewTripleInvoker(url)
} else {
// stub code generated by old protoc-gen-go-triple
invoker, err = NewDubbo3Invoker(url)
}
if err != nil {
logger.Warnf("can't dial the server: %s", url.Key())
return nil
Expand Down
Loading