Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid the problem of cpu idling & fix wrong definition of error #1629

Merged
merged 3 commits into from
Dec 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,18 @@ func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.Con
}

// DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
// Intercept the last bit
index := strings.Index(eventType.Path, "/providers/")
index := strings.Index(event.Path, "/providers/")
Mulavar marked this conversation as resolved.
Show resolved Hide resolved
if index == -1 {
logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
logger.Warnf("[RegistryDataListener][DataChange]Listen error zk node path {%s}, "+
"this listener is used to listen services which under the directory of providers/", event.Path)
return false
}
url := eventType.Path[index+len("/providers/"):]
url := event.Path[index+len("/providers/"):]
serviceURL, err := common.NewURL(url)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v} eventType.Path={%v}", url, err, eventType.Path)
logger.Errorf("[RegistryDataListener][DataChange]Listen NewURL({%s}) = error{%+v} event.Path={%s}", url, err, event.Path)
return false
}
l.mutex.Lock()
Expand All @@ -93,9 +94,9 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
if serviceURL.ServiceKey() == serviceKey {
listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Key: event.Path,
Value: serviceURL,
ConfigType: eventType.Action,
ConfigType: event.Action,
},
)
return true
Expand Down
6 changes: 0 additions & 6 deletions remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ const (
MaxFailTimes = 3
)

var (
errNilZkClientConn = perrors.New("zookeeper client{conn} is nil")
errNilChildren = perrors.Errorf("has none children")
errNilNode = perrors.Errorf("node does not exist")
)

// ValidateZookeeperClient validates client and sets options
func ValidateZookeeperClient(container ZkClientFacade, zkName string) error {
lock := container.ZkClientLock()
Expand Down
14 changes: 5 additions & 9 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,16 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li

newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
// FIXME always false
if err == errNilChildren {
// TODO need to ignore this error in gost
if err == gxzookeeper.ErrNilChildren {
content, _, connErr := l.client.Conn.Get(zkPath)
if connErr != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}",
zkPath, perrors.WithStack(connErr))
} else {
// TODO this if for config center listener, and will be removed when we refactor config center listener
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
}

} else {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
}
Expand Down Expand Up @@ -219,9 +219,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
var (
failTimes int
ttl time.Duration
event chan struct{}
)
event = make(chan struct{}, 4)
ttl = defaultTTL
if conf != nil {
timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
Expand All @@ -231,7 +229,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
}
}
defer close(event)
for {
// Get current children with watcher for the zkRootPath
children, childEventCh, err := l.client.GetChildrenW(zkRootPath)
Expand All @@ -251,8 +248,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
failTimes = 0
if len(children) == 0 {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", zkRootPath)
continue
logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", zkRootPath)
}
for _, c := range children {
// Only need to compare Path when subscribing to provider
Expand Down Expand Up @@ -324,10 +320,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
}

// startScheduleWatchTask periodically update provider information, return true when receive exit signal
func (l *ZkEventListener) startScheduleWatchTask(
zkRootPath string, children []string, ttl time.Duration,
listener remoting.DataListener, childEventCh <-chan zk.Event) bool {
// Periodically update provider information
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9
Expand Down