Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix:zk too many tcp conn #1010

Merged
merged 41 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
2aa489a
Merge pull request #981 from lzp0412/develop
AlexStocks Jan 6, 2021
3120a22
Merge remote-tracking branch 'upstream/develop' into fix_zk_too_many_…
wenxuwan Jan 23, 2021
33dc7d4
try to fix zk too many connections
wenxuwan Jan 23, 2021
984a1ae
remove close function in service_discovery
wenxuwan Jan 23, 2021
06b1cd1
remove unused code
wenxuwan Jan 23, 2021
16a43ac
try to fix lint
wenxuwan Jan 23, 2021
186a8e7
fix imports format and ut
wenxuwan Jan 23, 2021
a53ed2b
fix import fmt
wenxuwan Jan 23, 2021
bd59cfc
fix ut error
wenxuwan Jan 24, 2021
ead5951
fix ut error
wenxuwan Jan 24, 2021
55f40f7
Merge remote-tracking branch 'upstream/develop' into fix_zk_too_many_…
wenxuwan Jan 24, 2021
bb84695
fix ut
wenxuwan Jan 24, 2021
394cbb2
fix lint
wenxuwan Jan 24, 2021
930a15b
try to fix ut
wenxuwan Jan 25, 2021
7df6426
fix ut
wenxuwan Jan 25, 2021
93ae337
fix ut
wenxuwan Jan 25, 2021
2cb436b
so tired to fix the ut
wenxuwan Jan 25, 2021
543721c
fix lint
wenxuwan Jan 25, 2021
2bc815a
remove client close from registry
wenxuwan Jan 26, 2021
0a215ac
fix provider not started
wenxuwan Jan 26, 2021
f1c5ee4
Merge remote-tracking branch 'upstream/1.5' into fix_zk_too_many_tcp_…
wenxuwan Feb 23, 2021
43280d7
change some comments
wenxuwan Feb 25, 2021
6fa3fea
fix facade ut
wenxuwan Feb 25, 2021
132ca25
Merge remote-tracking branch 'upstream/1.5' into fix_zk_too_many_tcp_…
wenxuwan Feb 25, 2021
93374a2
fix glint
wenxuwan Feb 25, 2021
991ef61
Merge remote-tracking branch 'upstream/1.5' into fix_zk_too_many_tcp_…
wenxuwan Feb 25, 2021
cb8b665
fix restart can't find provider error
wenxuwan Feb 26, 2021
3fdbb01
try to fix provider lost
wenxuwan Feb 26, 2021
ef8966c
try to fix provider lost
wenxuwan Feb 27, 2021
ca647ab
try to fix provider can't find error
wenxuwan Feb 27, 2021
3a05f71
fix compile error
wenxuwan Feb 27, 2021
ae6a138
change type error
wenxuwan Feb 27, 2021
a8bc4e1
fix comments
wenxuwan Feb 28, 2021
972a91a
move zk to gost
wenxuwan Mar 2, 2021
0ebe79c
fix lint
wenxuwan Mar 2, 2021
46801c3
replace grpc version
wenxuwan Mar 2, 2021
364be20
update gost version
wenxuwan Mar 3, 2021
786e843
fix compile error
wenxuwan Mar 3, 2021
9d8c910
fix comments
wenxuwan Mar 4, 2021
98ba75e
fix comments
wenxuwan Mar 6, 2021
a231789
upgrade hession version
wenxuwan Mar 15, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 21 additions & 24 deletions cluster/router/chain/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
)

import (
zk "github.com/dubbogo/go-zookeeper/zk"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
"github.com/stretchr/testify/assert"
)

Expand All @@ -38,7 +40,6 @@ import (
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting/zookeeper"
)

const (
Expand All @@ -53,16 +54,19 @@ const (
consumerFormat = "consumer://%s/com.foo.BarService"
dubboForamt = "dubbo://%s:%d/com.foo.BarService"
anyUrlFormat = "condition://%s/com.foo.BarService"
zk = "zookeeper"
zkName = "zookeeper"
applicationKey = "test-condition"
applicationField = "application"
forceField = "force"
forceValue = "true"
)

var zkCluster *zk.TestCluster

func TestNewRouterChain(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
zkCluster = ts
err = z.Create(path)
assert.NoError(t, err)
testyml := `scope: application
Expand All @@ -77,12 +81,12 @@ conditions:
_, err = z.Conn.Set(path, []byte(testyml), 0)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
z.Delete(path)
z.Close()
}()

zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)

assert.Nil(t, err)
Expand Down Expand Up @@ -114,7 +118,7 @@ func TestNewRouterChainURLNil(t *testing.T) {
}

func TestRouterChainAddRouters(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
_, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second, gxzookeeper.WithTestCluster(zkCluster))
assert.NoError(t, err)
err = z.Create(path)
assert.NoError(t, err)
Expand All @@ -131,13 +135,12 @@ conditions:
_, err = z.Conn.Set(path, []byte(testyml), 0)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
assert.NoError(t, err)
z.Delete(path)
z.Close()
}()

zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, zkCluster.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
assert.NoError(t, err)
config.GetEnvInstance().SetDynamicConfiguration(configuration)

Expand All @@ -164,15 +167,10 @@ conditions:
}

func TestRouterChainRoute(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
defer func() {
err = ts.Stop()
assert.NoError(t, err)
z.Close()
}()

ts, _, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second, gxzookeeper.WithTestCluster(zkCluster))
assert.Nil(t, err)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
assert.NoError(t, err)
config.GetEnvInstance().SetDynamicConfiguration(configuration)

Expand All @@ -197,7 +195,7 @@ func TestRouterChainRoute(t *testing.T) {
}

func TestRouterChainRouteAppRouter(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second, gxzookeeper.WithTestCluster(zkCluster))
assert.NoError(t, err)
err = z.Create(path)
assert.NoError(t, err)
Expand All @@ -214,13 +212,12 @@ conditions:
_, err = z.Conn.Set(path, []byte(testyml), 0)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
assert.NoError(t, err)
z.Delete(path)
z.Close()
}()

zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
assert.NoError(t, err)
config.GetEnvInstance().SetDynamicConfiguration(configuration)

Expand All @@ -242,7 +239,7 @@ conditions:
}

func TestRouterChainRouteNoRoute(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second, gxzookeeper.WithTestCluster(zkCluster))
assert.Nil(t, err)
defer func() {
_ = ts.Stop()
Expand All @@ -251,7 +248,7 @@ func TestRouterChainRouteNoRoute(t *testing.T) {
}()

zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
assert.Nil(t, err)

Expand Down
10 changes: 4 additions & 6 deletions cluster/router/condition/app_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

import (
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
"github.com/stretchr/testify/assert"
)

Expand All @@ -35,7 +36,6 @@ import (
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
)

const (
Expand All @@ -60,15 +60,14 @@ runtime: false
conditions:
- => host != 172.22.3.91
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
err = z.Create(routerPath)
assert.NoError(t, err)

_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
assert.NoError(t, err)
defer func() {
err = ts.Stop()
assert.NoError(t, err)
z.Close()
}()
Expand Down Expand Up @@ -116,15 +115,14 @@ conditions:
- => host != 172.22.3.91
- host = 192.168.199.208 => host = 192.168.199.208
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
err = z.Create(routerPath)
assert.NoError(t, err)

_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
assert.NoError(t, err)
defer func() {
err = ts.Stop()
assert.NoError(t, err)
z.Close()
}()
Expand Down Expand Up @@ -163,7 +161,7 @@ runtime: false
conditions:
- => host != 172.22.3.91
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
err = z.Create(routerPath)
assert.NoError(t, err)
Expand Down
8 changes: 3 additions & 5 deletions cluster/router/tag/tag_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
import (
"github.com/RoaringBitmap/roaring"
"github.com/dubbogo/go-zookeeper/zk"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
Expand All @@ -43,7 +44,6 @@ import (
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
)

const (
Expand Down Expand Up @@ -260,7 +260,7 @@ type DynamicTagRouter struct {
//rule *RouterRule

route *tagRouter
zkClient *zookeeper.ZookeeperClient
zkClient *gxzookeeper.ZookeeperClient
testCluster *zk.TestCluster
invokers []protocol.Invoker
url *common.URL
Expand Down Expand Up @@ -299,7 +299,7 @@ tags:
- name: tag3
addresses: ["127.0.0.1:20003", "127.0.0.1:20004"]
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
suite.NoError(err)
err = z.Create(routerPath)
suite.NoError(err)
Expand Down Expand Up @@ -334,8 +334,6 @@ tags:

func (suite *DynamicTagRouter) TearDownTest() {
suite.zkClient.Close()
err := suite.testCluster.Stop()
suite.Nil(err)
}

func (suite *DynamicTagRouter) TestDynamicTagRouterSetByIPv4() {
Expand Down
12 changes: 6 additions & 6 deletions config_center/zookeeper/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

import (
gxset "github.com/dubbogo/gost/container/set"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
perrors "github.com/pkg/errors"
)

Expand All @@ -50,7 +51,7 @@ type zookeeperDynamicConfiguration struct {
wg sync.WaitGroup
cltLock sync.Mutex
done chan struct{}
client *zookeeper.ZookeeperClient
client *gxzookeeper.ZookeeperClient

//listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
Expand All @@ -63,7 +64,7 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu
url: url,
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
}
err := zookeeper.ValidateZookeeperClient(c, zookeeper.WithZkName(ZkClient))
err := zookeeper.ValidateZookeeperClient(c, ZkClient)
if err != nil {
logger.Errorf("zookeeper client start error ,error message is %v", err)
return nil, err
Expand Down Expand Up @@ -163,11 +164,11 @@ func (c *zookeeperDynamicConfiguration) SetParser(p parser.ConfigurationParser)
c.parser = p
}

func (c *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
func (c *zookeeperDynamicConfiguration) ZkClient() *gxzookeeper.ZookeeperClient {
return c.client
}

func (c *zookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) {
func (c *zookeeperDynamicConfiguration) SetZkClient(client *gxzookeeper.ZookeeperClient) {
c.client = client
}

Expand Down Expand Up @@ -206,10 +207,9 @@ func (c *zookeeperDynamicConfiguration) IsAvailable() bool {
}

func (c *zookeeperDynamicConfiguration) closeConfigs() {
logger.Infof("begin to close provider zk client")
c.cltLock.Lock()
defer c.cltLock.Unlock()
logger.Infof("begin to close provider zk client")
// Close the old client first to close the tmp node
c.client.Close()
wenxuwan marked this conversation as resolved.
Show resolved Hide resolved
c.client = nil
}
Expand Down
17 changes: 11 additions & 6 deletions config_center/zookeeper/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ func initZkData(group string, t *testing.T) (*zk.TestCluster, *zookeeperDynamicC
dubbo.service.com.ikurento.user.UserProvider.cluster=failover
`
if group != "" {
err = zreg.client.Create(path.Join(zreg.rootPath, "dubbo", dubboPropertyFileName))
err = zreg.client.Create(path.Join(zreg.rootPath, group, dubboPropertyFileName))
assert.NoError(t, err)

_, err = zreg.client.Conn.Set(path.Join(zreg.rootPath, "dubbo", dubboPropertyFileName), []byte(data), 0)
_, err = zreg.client.Conn.Set(path.Join(zreg.rootPath, group, dubboPropertyFileName), []byte(data), 0)
assert.NoError(t, err)
} else {
err = zreg.client.Create(path.Join(zreg.rootPath, dubboPropertyFileName))
Expand All @@ -100,6 +100,7 @@ func initZkData(group string, t *testing.T) (*zk.TestCluster, *zookeeperDynamicC
func TestGetConfig(t *testing.T) {
ts, reg := initZkData("dubbo", t)
defer func() {
reg.client.Close()
err := ts.Stop()
assert.NoError(t, err)
}()
Expand All @@ -122,11 +123,13 @@ func TestGetConfig(t *testing.T) {
func TestAddListener(t *testing.T) {
ts, reg := initZkData("", t)
defer func() {
reg.client.Close()
err := ts.Stop()
assert.NoError(t, err)
}()
listener := &mockDataListener{}
reg.AddListener(dubboPropertyFileName, listener)

listener.wg.Add(1)
data := `
dubbo.consumer.request_timeout=3s
Expand Down Expand Up @@ -158,6 +161,7 @@ func TestAddListener(t *testing.T) {
func TestRemoveListener(t *testing.T) {
ts, reg := initZkData("", t)
defer func() {
reg.client.Close()
err := ts.Stop()
assert.NoError(t, err)
}()
Expand Down Expand Up @@ -197,19 +201,20 @@ func TestZookeeperDynamicConfigurationPublishConfig(t *testing.T) {
value := "Test Data"
customGroup := "Custom Group"
key := "myKey"
ts, zk := initZkData(config_center.DEFAULT_GROUP, t)
ts, reg := initZkData(config_center.DEFAULT_GROUP, t)
defer func() {
reg.client.Close()
err := ts.Stop()
assert.NoError(t, err)
}()
err := zk.PublishConfig(key, customGroup, value)
err := reg.PublishConfig(key, customGroup, value)
assert.Nil(t, err)
result, err := zk.GetInternalProperty("myKey", config_center.WithGroup(customGroup))
result, err := reg.GetInternalProperty("myKey", config_center.WithGroup(customGroup))
assert.Nil(t, err)
assert.Equal(t, value, result)

var keys *gxset.HashSet
keys, err = zk.GetConfigKeysByGroup(customGroup)
keys, err = reg.GetConfigKeysByGroup(customGroup)
assert.Nil(t, err)
assert.Equal(t, 1, keys.Size())
assert.True(t, keys.Contains(key))
Expand Down
Loading