Skip to content

Commit

Permalink
Fix:Resolve service disconnection and configuration invalidations (#2717
Browse files Browse the repository at this point in the history
)

* Fix:Resolve service disconnection and configuration invalidations

* Fix:Resolve config_center part of the configuration does not work

* fix golang-lint

* fix golang-lint

* remove script router
  • Loading branch information
FinalT authored Jul 26, 2024
1 parent fe5afaa commit fcf3987
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 11 deletions.
15 changes: 15 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
Expand Down Expand Up @@ -102,6 +103,20 @@ func (cli *Client) DialWithInfo(interfaceName string, info *ClientInfo, opts ...
return cli.dial(interfaceName, info, opts...)
}

func (cli *Client) DialWithDefinition(interfaceName string, definition *ClientDefinition, opts ...ReferenceOption) (*Connection, error) {
// TODO(finalt) Temporarily solve the config_center configuration does not work
refName := common.GetReference(definition.Svc)
if refConfig, ok := cli.cliOpts.Consumer.References[refName]; ok {
ref := cli.cliOpts.overallReference.Clone()
for _, opt := range refConfig.GetOptions() {
opt(ref)
}
opts = append(opts, setReference(ref))
}

return cli.dial(interfaceName, definition.Info, opts...)
}

func (cli *Client) dial(interfaceName string, info *ClientInfo, opts ...ReferenceOption) (*Connection, error) {
newRefOpts := defaultReferenceOptions()
finalOpts := []ReferenceOption{
Expand Down
13 changes: 10 additions & 3 deletions cluster/cluster/failover/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package failover

import (
"context"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
"fmt"
"strconv"
)

import (
Expand All @@ -35,6 +35,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
)

type failoverClusterInvoker struct {
Expand All @@ -61,7 +62,7 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
}

methodName := invocation.ActualMethodName()
retries := getRetries(invokers, methodName)
retries := getRetries(invokers, methodName, invocation)
loadBalance := base.GetLoadBalance(invokers[0], methodName)

for i := 0; i <= retries; i++ {
Expand Down Expand Up @@ -113,7 +114,13 @@ func isBizError(err error) bool {
return triple_protocol.IsWireError(err) && triple_protocol.CodeOf(err) == triple_protocol.CodeBizError
}

func getRetries(invokers []protocol.Invoker, methodName string) int {
func getRetries(invokers []protocol.Invoker, methodName string, invocation protocol.Invocation) int {
// Todo(finalt) Temporarily solve the problem that the retries is not valid
if retries, ok := invocation.GetAttachment(constant.RetriesKey); ok {
if rInt, err := strconv.Atoi(retries); err == nil {
return rInt
}
}
if len(invokers) <= 0 {
return constant.DefaultRetriesInt
}
Expand Down
5 changes: 2 additions & 3 deletions cluster/router/script/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package script

import (
"dubbo.apache.org/dubbo-go/v3/cluster/router"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
)

func init() {
Expand All @@ -29,7 +27,8 @@ func init() {
and cause warning if config center is empty.
User can import this package and config config center to use Script router.
*/
extension.SetRouterFactory(constant.ScriptRouterFactoryKey, NewScriptRouterFactory)
// TODO(finalt) Temporarily removed until fixed (https://github.com/apache/dubbo-go/pull/2716)
//extension.SetRouterFactory(constant.ScriptRouterFactoryKey, NewScriptRouterFactory)
}

// ScriptRouteFactory router factory
Expand Down
54 changes: 54 additions & 0 deletions compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,13 +644,67 @@ func compatGlobalConsumerConfig(c *config.ConsumerConfig) *global.ConsumerConfig
ProxyFactory: c.ProxyFactory,
Check: c.Check,
AdaptiveService: c.AdaptiveService,
References: compatGlobalReferences(c.References),
TracingKey: c.TracingKey,
FilterConf: c.FilterConf,
MaxWaitTimeForServiceDiscovery: c.MaxWaitTimeForServiceDiscovery,
MeshEnabled: c.MeshEnabled,
}
}

func compatGlobalReferences(c map[string]*config.ReferenceConfig) map[string]*global.ReferenceConfig {
refs := make(map[string]*global.ReferenceConfig, len(c))
for name, ref := range c {
refs[name] = &global.ReferenceConfig{
InterfaceName: ref.InterfaceName,
Check: ref.Check,
URL: ref.URL,
Filter: ref.Filter,
Protocol: ref.Protocol,
RegistryIDs: ref.RegistryIDs,
Cluster: ref.Cluster,
Loadbalance: ref.Loadbalance,
Retries: ref.Retries,
Group: ref.Group,
Version: ref.Version,
Serialization: ref.Serialization,
ProvidedBy: ref.ProvidedBy,
Methods: compatGlobalMethod(ref.Methods),
Async: ref.Async,
Params: ref.Params,
Generic: ref.Generic,
Sticky: ref.Sticky,
RequestTimeout: ref.RequestTimeout,
ForceTag: ref.ForceTag,
TracingKey: ref.TracingKey,
MeshProviderPort: ref.MeshProviderPort,
}
}
return refs
}

func compatGlobalMethod(m []*config.MethodConfig) []*global.MethodConfig {
methods := make([]*global.MethodConfig, 0, len(m))
for _, method := range m {
methods = append(methods, &global.MethodConfig{
InterfaceId: method.InterfaceId,
InterfaceName: method.InterfaceName,
Name: method.Name,
Retries: method.Retries,
LoadBalance: method.LoadBalance,
Weight: method.Weight,
TpsLimitInterval: method.TpsLimitInterval,
TpsLimitRate: method.TpsLimitRate,
TpsLimitStrategy: method.TpsLimitStrategy,
ExecuteLimit: method.ExecuteLimit,
ExecuteLimitRejectedHandler: method.ExecuteLimitRejectedHandler,
Sticky: method.Sticky,
RequestTimeout: method.RequestTimeout,
})
}
return methods
}

func compatGlobalMetricConfig(c *config.MetricsConfig) *global.MetricsConfig {
if c == nil {
return nil
Expand Down
2 changes: 1 addition & 1 deletion dubbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (ins *Instance) loadConsumer() error {
conLock.RLock()
defer conLock.RUnlock()
for intfName, definition := range consumerServices {
conn, dialErr := cli.DialWithInfo(intfName, definition.Info)
conn, dialErr := cli.DialWithDefinition(intfName, definition)
if dialErr != nil {
return dialErr
}
Expand Down
68 changes: 68 additions & 0 deletions global/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,74 @@ func DefaultReferenceConfig() *ReferenceConfig {
}
}

func (c *ReferenceConfig) GetOptions() []ReferenceOption {
var refOpts []ReferenceOption
if c.InterfaceName != "" {
refOpts = append(refOpts, WithReference_InterfaceName(c.InterfaceName))
}
if c.Check != nil {
refOpts = append(refOpts, WithReference_Check(*c.Check))
}
if c.URL != "" {
refOpts = append(refOpts, WithReference_URL(c.URL))
}
if c.Filter != "" {
refOpts = append(refOpts, WithReference_Filter(c.Filter))
}
if c.Protocol != "" {
refOpts = append(refOpts, WithReference_Protocol(c.Protocol))
}
if c.RegistryIDs != nil && len(c.RegistryIDs) > 0 {
refOpts = append(refOpts, WithReference_RegistryIDs(c.RegistryIDs))
}
if c.Cluster != "" {
refOpts = append(refOpts, WithReference_Cluster(c.Cluster))
}
if c.Loadbalance != "" {
refOpts = append(refOpts, WithReference_LoadBalance(c.Loadbalance))
}
if c.Retries != "" {
if rInt, err := strconv.Atoi(c.Retries); err == nil {
refOpts = append(refOpts, WithReference_Retries(rInt))
}
}
if c.Group != "" {
refOpts = append(refOpts, WithReference_Group(c.Group))
}
if c.Version != "" {
refOpts = append(refOpts, WithReference_Version(c.Version))
}
if c.Serialization != "" {
refOpts = append(refOpts, WithReference_Serialization(c.Serialization))
}
if c.ProvidedBy != "" {
refOpts = append(refOpts, WithReference_ProviderBy(c.ProvidedBy))
}
if c.Params != nil && len(c.Params) > 0 {
newParams := make(map[string]string, len(c.Params))
for k, v := range c.Params {
newParams[k] = v
}
refOpts = append(refOpts, WithReference_Params(newParams))
}
if c.Generic != "" {
refOpts = append(refOpts, WithReference_Generic(c.Generic))
}
if c.Sticky {
refOpts = append(refOpts, WithReference_Sticky(c.Sticky))
}
if c.RequestTimeout != "" {
refOpts = append(refOpts, WithReference_RequestTimeout(c.RequestTimeout))
}
if c.TracingKey != "" {
refOpts = append(refOpts, WithReference_TracingKey(c.TracingKey))
}
if c.MeshProviderPort != 0 {
refOpts = append(refOpts, WithReference_MeshProviderPort(c.MeshProviderPort))
}
return refOpts
}

// Clone a new ReferenceConfig
func (c *ReferenceConfig) Clone() *ReferenceConfig {
if c == nil {
Expand Down
1 change: 0 additions & 1 deletion protocol/triple/internal/server/cmd_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func main() {
protocol.WithTriple(),
protocol.WithPort(20000),
),
server.WithServerVersion("1.0.0"),
)

if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions protocol/triple/triple_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func (ti *TripleInvoker) Invoke(ctx context.Context, invocation protocol.Invocat
}

func mergeAttachmentToOutgoing(ctx context.Context, inv protocol.Invocation) (context.Context, error) {
// Todo(finalt) Temporarily solve the problem that the timeout time is not valid
if timeout, ok := inv.GetAttachment(constant.TimeoutKey); ok {
ctx = context.WithValue(ctx, "dubbo.timeout.key", timeout)

Check failure on line 148 in protocol/triple/triple_invoker.go

View workflow job for this annotation

GitHub Actions / lint (1.20)

SA1029: should not use built-in type string as key for value; define your own type to avoid collisions (staticcheck)
}
for key, valRaw := range inv.Attachments() {
if str, ok := valRaw.(string); ok {
ctx = tri.AppendToOutgoingContext(ctx, key, str)
Expand Down
19 changes: 17 additions & 2 deletions protocol/triple/triple_protocol/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,27 @@ func parseRequestURL(rawURL string) (*url.URL, *Error) {
func applyDefaultTimeout(ctx context.Context, timeout time.Duration) (context.Context, bool, context.CancelFunc) {
var cancel context.CancelFunc
var applyFlag bool

_, ok := ctx.Deadline()

// Todo(finalt) Temporarily solve the problem that the timeout time is not valid
if !ok {
timeoutVal := ctx.Value("dubbo.timeout.key")
if timeoutVal != nil {
if s, exist := timeoutVal.(string); exist && s != "" {
if newTimeout, err := time.ParseDuration(s); err == nil {
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(newTimeout))
applyFlag = true
return ctx, applyFlag, cancel
}
}
}
}

if !ok && timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, timeout)
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(timeout))
applyFlag = true
}

return ctx, applyFlag, cancel
}

Expand Down
7 changes: 6 additions & 1 deletion protocol/triple/triple_protocol/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package triple_protocol

import (
"context"
"errors"
"fmt"
"net/http"
)
Expand Down Expand Up @@ -363,7 +364,11 @@ func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Re
svcGroup := request.Header.Get(tripleServiceGroup)
svcVersion := request.Header.Get(tripleServiceVersion)
// todo(DMwangnima): inspect ok
implementation := h.implementations[getIdentifier(svcGroup, svcVersion)]
implementation, ok := h.implementations[getIdentifier(svcGroup, svcVersion)]
if !ok {
_ = connCloser.Close(errors.New("no implementation for " + svcVersion))
return
}
_ = connCloser.Close(implementation(ctx, connCloser))
}

Expand Down

0 comments on commit fcf3987

Please sign in to comment.