Skip to content

Commit

Permalink
Merge pull request #1010 from wenxuwan/fix_zk_too_many_tcp_conn
Browse files Browse the repository at this point in the history
Fix:zk too many tcp conn
  • Loading branch information
AlexStocks authored Mar 15, 2021
2 parents bd25f90 + 73b0942 commit 67792fa
Show file tree
Hide file tree
Showing 24 changed files with 547 additions and 937 deletions.
45 changes: 21 additions & 24 deletions cluster/router/chain/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
)

import (
zk "github.com/dubbogo/go-zookeeper/zk"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
"github.com/stretchr/testify/assert"
)

Expand All @@ -38,7 +40,6 @@ import (
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting/zookeeper"
)

const (
Expand All @@ -53,16 +54,19 @@ const (
consumerFormat = "consumer://%s/com.foo.BarService"
dubboForamt = "dubbo://%s:%d/com.foo.BarService"
anyUrlFormat = "condition://%s/com.foo.BarService"
zk = "zookeeper"
zkName = "zookeeper"
applicationKey = "test-condition"
applicationField = "application"
forceField = "force"
forceValue = "true"
)

var zkCluster *zk.TestCluster

func TestNewRouterChain(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
zkCluster = ts
err = z.Create(path)
assert.NoError(t, err)
testyml := `scope: application
Expand All @@ -77,12 +81,12 @@ conditions:
_, err = z.Conn.Set(path, []byte(testyml), 0)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
z.Delete(path)
z.Close()
}()

zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)

assert.Nil(t, err)
Expand Down Expand Up @@ -114,7 +118,7 @@ func TestNewRouterChainURLNil(t *testing.T) {
}

func TestRouterChainAddRouters(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
_, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second, gxzookeeper.WithTestCluster(zkCluster))
assert.NoError(t, err)
err = z.Create(path)
assert.NoError(t, err)
Expand All @@ -131,13 +135,12 @@ conditions:
_, err = z.Conn.Set(path, []byte(testyml), 0)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
assert.NoError(t, err)
z.Delete(path)
z.Close()
}()

zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, zkCluster.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
assert.NoError(t, err)
config.GetEnvInstance().SetDynamicConfiguration(configuration)

Expand All @@ -164,15 +167,10 @@ conditions:
}

func TestRouterChainRoute(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
defer func() {
err = ts.Stop()
assert.NoError(t, err)
z.Close()
}()

ts, _, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second, gxzookeeper.WithTestCluster(zkCluster))
assert.Nil(t, err)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
assert.NoError(t, err)
config.GetEnvInstance().SetDynamicConfiguration(configuration)

Expand All @@ -197,7 +195,7 @@ func TestRouterChainRoute(t *testing.T) {
}

func TestRouterChainRouteAppRouter(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second, gxzookeeper.WithTestCluster(zkCluster))
assert.NoError(t, err)
err = z.Create(path)
assert.NoError(t, err)
Expand All @@ -214,13 +212,12 @@ conditions:
_, err = z.Conn.Set(path, []byte(testyml), 0)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
assert.NoError(t, err)
z.Delete(path)
z.Close()
}()

zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
assert.NoError(t, err)
config.GetEnvInstance().SetDynamicConfiguration(configuration)

Expand All @@ -242,7 +239,7 @@ conditions:
}

func TestRouterChainRouteNoRoute(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second, gxzookeeper.WithTestCluster(zkCluster))
assert.Nil(t, err)
defer func() {
_ = ts.Stop()
Expand All @@ -251,7 +248,7 @@ func TestRouterChainRouteNoRoute(t *testing.T) {
}()

zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
assert.Nil(t, err)

Expand Down
10 changes: 4 additions & 6 deletions cluster/router/condition/app_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

import (
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
"github.com/stretchr/testify/assert"
)

Expand All @@ -35,7 +36,6 @@ import (
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
)

const (
Expand All @@ -60,15 +60,14 @@ runtime: false
conditions:
- => host != 172.22.3.91
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
err = z.Create(routerPath)
assert.NoError(t, err)

_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
assert.NoError(t, err)
defer func() {
err = ts.Stop()
assert.NoError(t, err)
z.Close()
}()
Expand Down Expand Up @@ -116,15 +115,14 @@ conditions:
- => host != 172.22.3.91
- host = 192.168.199.208 => host = 192.168.199.208
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
err = z.Create(routerPath)
assert.NoError(t, err)

_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
assert.NoError(t, err)
defer func() {
err = ts.Stop()
assert.NoError(t, err)
z.Close()
}()
Expand Down Expand Up @@ -163,7 +161,7 @@ runtime: false
conditions:
- => host != 172.22.3.91
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
err = z.Create(routerPath)
assert.NoError(t, err)
Expand Down
8 changes: 3 additions & 5 deletions cluster/router/tag/tag_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
import (
"github.com/RoaringBitmap/roaring"
"github.com/dubbogo/go-zookeeper/zk"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
Expand All @@ -43,7 +44,6 @@ import (
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
)

const (
Expand Down Expand Up @@ -260,7 +260,7 @@ type DynamicTagRouter struct {
//rule *RouterRule

route *tagRouter
zkClient *zookeeper.ZookeeperClient
zkClient *gxzookeeper.ZookeeperClient
testCluster *zk.TestCluster
invokers []protocol.Invoker
url *common.URL
Expand Down Expand Up @@ -299,7 +299,7 @@ tags:
- name: tag3
addresses: ["127.0.0.1:20003", "127.0.0.1:20004"]
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
suite.NoError(err)
err = z.Create(routerPath)
suite.NoError(err)
Expand Down Expand Up @@ -334,8 +334,6 @@ tags:

func (suite *DynamicTagRouter) TearDownTest() {
suite.zkClient.Close()
err := suite.testCluster.Stop()
suite.Nil(err)
}

func (suite *DynamicTagRouter) TestDynamicTagRouterSetByIPv4() {
Expand Down
12 changes: 6 additions & 6 deletions config_center/zookeeper/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

import (
gxset "github.com/dubbogo/gost/container/set"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
perrors "github.com/pkg/errors"
)

Expand All @@ -50,7 +51,7 @@ type zookeeperDynamicConfiguration struct {
wg sync.WaitGroup
cltLock sync.Mutex
done chan struct{}
client *zookeeper.ZookeeperClient
client *gxzookeeper.ZookeeperClient

//listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
Expand All @@ -63,7 +64,7 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu
url: url,
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
}
err := zookeeper.ValidateZookeeperClient(c, zookeeper.WithZkName(ZkClient))
err := zookeeper.ValidateZookeeperClient(c, ZkClient)
if err != nil {
logger.Errorf("zookeeper client start error ,error message is %v", err)
return nil, err
Expand Down Expand Up @@ -163,11 +164,11 @@ func (c *zookeeperDynamicConfiguration) SetParser(p parser.ConfigurationParser)
c.parser = p
}

func (c *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
func (c *zookeeperDynamicConfiguration) ZkClient() *gxzookeeper.ZookeeperClient {
return c.client
}

func (c *zookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) {
func (c *zookeeperDynamicConfiguration) SetZkClient(client *gxzookeeper.ZookeeperClient) {
c.client = client
}

Expand Down Expand Up @@ -206,10 +207,9 @@ func (c *zookeeperDynamicConfiguration) IsAvailable() bool {
}

func (c *zookeeperDynamicConfiguration) closeConfigs() {
logger.Infof("begin to close provider zk client")
c.cltLock.Lock()
defer c.cltLock.Unlock()
logger.Infof("begin to close provider zk client")
// Close the old client first to close the tmp node
c.client.Close()
c.client = nil
}
Expand Down
17 changes: 11 additions & 6 deletions config_center/zookeeper/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ func initZkData(group string, t *testing.T) (*zk.TestCluster, *zookeeperDynamicC
dubbo.service.com.ikurento.user.UserProvider.cluster=failover
`
if group != "" {
err = zreg.client.Create(path.Join(zreg.rootPath, "dubbo", dubboPropertyFileName))
err = zreg.client.Create(path.Join(zreg.rootPath, group, dubboPropertyFileName))
assert.NoError(t, err)

_, err = zreg.client.Conn.Set(path.Join(zreg.rootPath, "dubbo", dubboPropertyFileName), []byte(data), 0)
_, err = zreg.client.Conn.Set(path.Join(zreg.rootPath, group, dubboPropertyFileName), []byte(data), 0)
assert.NoError(t, err)
} else {
err = zreg.client.Create(path.Join(zreg.rootPath, dubboPropertyFileName))
Expand All @@ -100,6 +100,7 @@ func initZkData(group string, t *testing.T) (*zk.TestCluster, *zookeeperDynamicC
func TestGetConfig(t *testing.T) {
ts, reg := initZkData("dubbo", t)
defer func() {
reg.client.Close()
err := ts.Stop()
assert.NoError(t, err)
}()
Expand All @@ -122,11 +123,13 @@ func TestGetConfig(t *testing.T) {
func TestAddListener(t *testing.T) {
ts, reg := initZkData("", t)
defer func() {
reg.client.Close()
err := ts.Stop()
assert.NoError(t, err)
}()
listener := &mockDataListener{}
reg.AddListener(dubboPropertyFileName, listener)

listener.wg.Add(1)
data := `
dubbo.consumer.request_timeout=3s
Expand Down Expand Up @@ -158,6 +161,7 @@ func TestAddListener(t *testing.T) {
func TestRemoveListener(t *testing.T) {
ts, reg := initZkData("", t)
defer func() {
reg.client.Close()
err := ts.Stop()
assert.NoError(t, err)
}()
Expand Down Expand Up @@ -197,19 +201,20 @@ func TestZookeeperDynamicConfigurationPublishConfig(t *testing.T) {
value := "Test Data"
customGroup := "Custom Group"
key := "myKey"
ts, zk := initZkData(config_center.DEFAULT_GROUP, t)
ts, reg := initZkData(config_center.DEFAULT_GROUP, t)
defer func() {
reg.client.Close()
err := ts.Stop()
assert.NoError(t, err)
}()
err := zk.PublishConfig(key, customGroup, value)
err := reg.PublishConfig(key, customGroup, value)
assert.Nil(t, err)
result, err := zk.GetInternalProperty("myKey", config_center.WithGroup(customGroup))
result, err := reg.GetInternalProperty("myKey", config_center.WithGroup(customGroup))
assert.Nil(t, err)
assert.Equal(t, value, result)

var keys *gxset.HashSet
keys, err = zk.GetConfigKeysByGroup(customGroup)
keys, err = reg.GetConfigKeysByGroup(customGroup)
assert.Nil(t, err)
assert.Equal(t, 1, keys.Size())
assert.True(t, keys.Contains(key))
Expand Down
Loading

0 comments on commit 67792fa

Please sign in to comment.