Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize zk ListenServiceEvent listener #343

Merged
merged 7 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (

var (
errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
errNilChildren = perrors.Errorf("has none children")
)

// ZookeeperClient ...
Expand Down Expand Up @@ -513,7 +514,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event,
return nil, nil, perrors.Errorf("path{%s} has none children", path)
}
if len(children) == 0 {
return nil, nil, perrors.Errorf("path{%s} has none children", path)
return nil, nil, errNilChildren
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you should print the zkpath as before, otherwise users don't know what is going on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zouyx , @hxmhlt used the errNilChildren later. so there is no need to add the zkPath in the error. maybe an error log is better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

return children, event, nil
Expand Down Expand Up @@ -544,7 +545,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
return nil, perrors.Errorf("path{%s} has none children", path)
}
if len(children) == 0 {
return nil, perrors.Errorf("path{%s} has none children", path)
return nil, errNilChildren
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

}

return children, nil
Expand Down
9 changes: 9 additions & 0 deletions remoting/zookeeper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,12 @@ func TestRegisterTempSeq(t *testing.T) {
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}

func Test_UnregisterEvent(t *testing.T) {
client := &ZookeeperClient{}
client.eventRegistry = make(map[string][]*chan struct{})
array := []*chan struct{}{}
array = append(array, new(chan struct{}))
client.eventRegistry["test"] = array
client.UnregisterEvent("test", new(chan struct{}))
}
102 changes: 55 additions & 47 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package zookeeper

import (
"path"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -111,8 +110,17 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li

newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
return
if err == errNilChildren {
content, _, err := l.client.Conn.Get(zkPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkPath, perrors.WithStack(err))
} else {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
}

} else {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
}
}

// a node was added -- listen the new node
Expand Down Expand Up @@ -272,50 +280,50 @@ func timeSecondDuration(sec int) time.Duration {
// |
// --------> ListenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
var (
err error
dubboPath string
children []string
)

zkPath = strings.ReplaceAll(zkPath, "$", "%24")
l.pathMapLock.Lock()
_, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkPath)
return
}

l.pathMapLock.Lock()
l.pathMap[zkPath] = struct{}{}
l.pathMapLock.Unlock()

logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.GetChildren(zkPath)
if err != nil {
children = nil
logger.Warnf("fail to get children of zk path{%s}", zkPath)
}

for _, c := range children {
// listen l service node
dubboPath = path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string, listener remoting.DataListener) {
if l.ListenServiceNodeEvent(zkPath) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}
//var (
hxmhlt marked this conversation as resolved.
Show resolved Hide resolved
// err error
// dubboPath string
// children []string
//)
//
//zkPath = strings.ReplaceAll(zkPath, "$", "%24")
//l.pathMapLock.Lock()
//_, ok := l.pathMap[zkPath]
//l.pathMapLock.Unlock()
//if ok {
// logger.Warnf("@zkPath %s has already been listened.", zkPath)
// return
//}
//
//l.pathMapLock.Lock()
//l.pathMap[zkPath] = struct{}{}
//l.pathMapLock.Unlock()
//
//logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
//children, err = l.client.GetChildren(zkPath)
//if err != nil {
// children = nil
// logger.Warnf("fail to get children of zk path{%s}", zkPath)
//}
//
//for _, c := range children {
// // listen l service node
// dubboPath = path.Join(zkPath, c)
// content, _, err := l.client.Conn.Get(dubboPath)
// if err != nil {
// logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
// }
// if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
// continue
// }
// logger.Infof("listen dubbo service key{%s}", dubboPath)
// go func(zkPath string, listener remoting.DataListener) {
// if l.ListenServiceNodeEvent(zkPath) {
// listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
// }
// logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
// }(dubboPath, listener)
//}

logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, listener remoting.DataListener) {
Expand Down
3 changes: 1 addition & 2 deletions remoting/zookeeper/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,11 @@ func TestListener(t *testing.T) {
listener := NewZkEventListener(client)
dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait}
listener.ListenServiceEvent("/dubbo", dataListener)

time.Sleep(1 * time.Second)
_, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1)
assert.NoError(t, err)
wait.Wait()
assert.Equal(t, changedData, dataListener.eventList[1].Content)
client.Close()

}

Expand Down