Skip to content

Commit

Permalink
fix: when the routing policy is not set, the setInvokers() method has…
Browse files Browse the repository at this point in the history
… a co process leak (#2104)

* 非路由策略下,不触发节点变更的数据变更协程消息

* 非路由策略下,不触发节点变更的数据变更协程消息

* 非路由策略下,不触发节点变更的数据变更协程消息

* 非路由策略下,不触发节点变更的数据变更协程消息

* 非路由策略下,不触发节点变更的数据变更协程消息

* 非路由策略下,不触发节点变更的数据变更协程消息

* 非路由策略下,不触发节点变更的数据变更协程消息

* 非路由策略下,不触发节点变更的数据变更协程消息

* 非路由策略下,不触发节点变更的数据变更协程消息

Co-authored-by: zhangping17 <zhangping17@xiaomi.com>
  • Loading branch information
caochengxiang and caochengxiang committed Nov 26, 2022
1 parent 81c6d19 commit e0b1487
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 232 deletions.
15 changes: 15 additions & 0 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,15 @@ type RouterChain struct {
notify chan struct{}
// Address cache
cache atomic.Value

routerStatus atomic.Int32
}

const (
NoRouter = iota
HasRouter
)

func (c *RouterChain) GetNotifyChan() chan struct{} {
return c.notify
}
Expand Down Expand Up @@ -113,6 +120,10 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.routers = newRouters
if c.routerStatus.Load() == int32(NoRouter) {
return
}

go func() {
c.notify <- struct{}{}
}()
Expand All @@ -124,6 +135,9 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.mutex.Lock()
c.invokers = invokers
c.mutex.Unlock()
if c.routerStatus.Load() == int32(NoRouter) {
return
}

go func() {
c.notify <- struct{}{}
Expand Down Expand Up @@ -296,6 +310,7 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
last: time.Now(),
notify: make(chan struct{}),
}
chain.routerStatus.Store(int32(HasRouter))

routers := make([]router.PriorityRouter, 0, len(routerFactories))
for key, routerFactory := range routerFactories {
Expand Down
70 changes: 70 additions & 0 deletions cluster/router/chain/chain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package chain

import (
"fmt"
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

import (
_ "github.com/apache/dubbo-go/cluster/router"
_ "github.com/apache/dubbo-go/cluster/router/tag"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
)

var (
url, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
anyURL, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.ANYHOST_VALUE))
)

const (
test1234IP = "1.2.3.4"
test0000IP = "0.0.0.0"
port20000 = 20000

dubboForamt = "dubbo://%s:%d/com.foo.BarService"
anyUrlFormat = "condition://%s/com.foo.BarService"
applicationKey = "test-condition"
applicationField = "application"
forceField = "force"
forceValue = "true"
)

func TestNewRouterChain(t *testing.T) {
chain, _ := NewRouterChain(getRouteUrl(applicationKey))
assert.Equal(t, chain.routerStatus.Load(), int32(HasRouter))
var invokers []protocol.Invoker
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
}

func getRouteUrl(applicationKey string) *common.URL {
url, _ := common.NewURL(fmt.Sprintf(anyUrlFormat, test0000IP))
url.AddParam(applicationField, applicationKey)
url.AddParam(forceField, forceValue)
return url
}
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ require (
github.com/apache/dubbo-getty v1.4.7
github.com/apache/dubbo-go-hessian2 v1.9.4
github.com/creasty/defaults v1.5.1
github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5
github.com/dubbogo/gost v1.13.1
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/gost v1.11.21-0.20220503144918-9e5ae44480af
github.com/emicklei/go-restful/v3 v3.4.0
github.com/fsnotify/fsnotify v1.5.1
github.com/go-co-op/gocron v0.1.1
github.com/go-resty/resty/v2 v2.3.0
github.com/golang/mock v1.6.0
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.5.2
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/vault/sdk v0.5.0
Expand All @@ -27,15 +27,15 @@ require (
github.com/nacos-group/nacos-sdk-go v1.0.8
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.2
github.com/prometheus/client_golang v1.11.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/stretchr/testify v1.7.0
github.com/zouyx/agollo/v3 v3.4.5
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.21.0
google.golang.org/grpc v1.48.0
go.uber.org/zap v1.17.0
google.golang.org/grpc v1.41.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.16.9
k8s.io/apimachinery v0.16.9
Expand Down
Loading

0 comments on commit e0b1487

Please sign in to comment.