Skip to content

Commit

Permalink
Fix: as a path for pr 1225 (#1233)
Browse files Browse the repository at this point in the history
* defeat too much node listen goroutine

* fix dead lock (#1247)

* Fix: assertion bug for asynchronous test for getty (#1248)

* fix dead lock

* Temporarily fix error occurred in getty unit test

* fix testClient_AsyncCall assertion bug

Co-authored-by: XavierNiu <a@nxw.name>
  • Loading branch information
AlexStocks and justxuewei authored Jun 8, 2021
1 parent 5878aaa commit 37229e3
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 20 deletions.
26 changes: 16 additions & 10 deletions registry/zookeeper/service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,15 @@ import (
"github.com/apache/dubbo-go/registry"
)

var testName = "test"

var tc *zk.TestCluster
const testName = "test"

func prepareData(t *testing.T) *zk.TestCluster {
var err error
tc, err = zk.StartTestCluster(1, nil, nil)
tc, err := zk.StartTestCluster(1, nil, nil)
assert.NoError(t, err)
assert.NotNil(t, tc.Servers[0])
address := "127.0.0.1:" + strconv.Itoa(tc.Servers[0].Port)
//address := "127.0.0.1:2181"

config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
Protocol: "zookeeper",
Expand All @@ -63,6 +62,7 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
_, err := newZookeeperServiceDiscovery(name)

// the ServiceDiscoveryConfig not found
// err: could not init the instance because the config is invalid
assert.NotNil(t, err)

sdc := &config.ServiceDiscoveryConfig{
Expand All @@ -73,11 +73,20 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
_, err = newZookeeperServiceDiscovery(name)

// RemoteConfig not found
// err: could not find the remote config for name: mock
assert.NotNil(t, err)
}

func TestCURDZookeeperServiceDiscovery(t *testing.T) {
prepareData(t)
func TestZookeeperServiceDiscovery_CURDAndListener(t *testing.T) {
tc := prepareData(t)
defer func() {
_ = tc.Stop()
}()
t.Run("testCURDZookeeperServiceDiscovery", testCURDZookeeperServiceDiscovery)
t.Run("testAddListenerZookeeperServiceDiscovery", testAddListenerZookeeperServiceDiscovery)
}

func testCURDZookeeperServiceDiscovery(t *testing.T) {
sd, err := newZookeeperServiceDiscovery(testName)
assert.Nil(t, err)
defer func() {
Expand Down Expand Up @@ -142,10 +151,7 @@ func TestCURDZookeeperServiceDiscovery(t *testing.T) {
assert.Nil(t, err)
}

func TestAddListenerZookeeperServiceDiscovery(t *testing.T) {
defer func() {
_ = tc.Stop()
}()
func testAddListenerZookeeperServiceDiscovery(t *testing.T) {
sd, err := newZookeeperServiceDiscovery(testName)
assert.Nil(t, err)
defer func() {
Expand Down
10 changes: 6 additions & 4 deletions remoting/getty/getty_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func testGetUser61(t *testing.T, c *Client) {

func testClient_AsyncCall(t *testing.T, client *Client) {
user := &User{}
lock := sync.Mutex{}
wg := sync.WaitGroup{}
request := remoting.NewRequest("2.0.2")
invocation := createInvocation("GetUser0", nil, nil, []interface{}{"4", nil, "username"},
[]reflect.Value{reflect.ValueOf("4"), reflect.ValueOf(nil), reflect.ValueOf("username")})
Expand All @@ -327,13 +327,13 @@ func testClient_AsyncCall(t *testing.T, client *Client) {
r := response.(remoting.AsyncCallbackResponse)
rst := *r.Reply.(*remoting.Response).Result.(*protocol.RPCResult)
assert.Equal(t, User{Id: "4", Name: "username"}, *(rst.Rest.(*User)))
lock.Unlock()
wg.Done()
}
lock.Lock()
wg.Add(1)
err := client.Request(request, 3*time.Second, rsp)
assert.NoError(t, err)
assert.Equal(t, User{}, *user)
time.Sleep(1 * time.Second)
wg.Wait()
}

func InitTest(t *testing.T) (*Server, *common.URL) {
Expand Down Expand Up @@ -450,6 +450,8 @@ func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User
}

func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) {
// fix testClient_AsyncCall assertion
time.Sleep(1 * time.Second)
return User{Id: id, Name: name}, nil
}

Expand Down
27 changes: 21 additions & 6 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/dubbogo/go-zookeeper/zk"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
perrors "github.com/pkg/errors"

uatomic "go.uber.org/atomic"
)

import (
Expand All @@ -39,14 +41,14 @@ import (
)

var (
defaultTTL = 15 * time.Minute
defaultTTL = 10 * time.Minute
)

// nolint
type ZkEventListener struct {
client *gxzookeeper.ZookeeperClient
pathMapLock sync.Mutex
pathMap map[string]struct{}
pathMap map[string]*uatomic.Int32
wg sync.WaitGroup
exit chan struct{}
}
Expand All @@ -55,7 +57,7 @@ type ZkEventListener struct {
func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
pathMap: make(map[string]struct{}),
pathMap: make(map[string]*uatomic.Int32),
exit: make(chan struct{}),
}
}
Expand Down Expand Up @@ -83,6 +85,17 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
// nolint
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
defer l.wg.Done()

l.pathMapLock.Lock()
a, ok := l.pathMap[zkPath]
if !ok || a.Load() > 1 {
l.pathMapLock.Unlock()
return false
}
a.Inc()
l.pathMapLock.Unlock()
defer a.Dec()

var zkEvent zk.Event
for {
keyEventCh, err := l.client.ExistW(zkPath)
Expand Down Expand Up @@ -174,6 +187,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
// listen l service node
l.wg.Add(1)
go func(node string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(node, listener) {
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
Expand Down Expand Up @@ -271,15 +285,15 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
_, ok := l.pathMap[dubboPath]
if !ok {
l.pathMap[dubboPath] = uatomic.NewInt32(0)
}
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", dubboPath)
continue
}

l.pathMapLock.Lock()
l.pathMap[dubboPath] = struct{}{}
l.pathMapLock.Unlock()
// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
l.client.RLock()
if l.client.Conn == nil {
Expand All @@ -298,6 +312,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
logger.Infof("listen dubbo service key{%s}", dubboPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
Expand Down

0 comments on commit 37229e3

Please sign in to comment.