Skip to content

Commit

Permalink
avoid the problem of cpu idling & fix wrong definition of error
Browse files Browse the repository at this point in the history
  • Loading branch information
dongjianhui03 committed Dec 1, 2021
1 parent a8fba70 commit 12cca45
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 22 deletions.
15 changes: 8 additions & 7 deletions registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,18 @@ func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.Con
}

// DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
// Intercept the last bit
index := strings.Index(eventType.Path, "/providers/")
index := strings.Index(event.Path, "/providers/")
if index == -1 {
logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
logger.Warnf("[RegistryDataListener][DataChange]Listen error zk node path {%s}, "+
"this listener is used to listen services which under the directory of providers/", event.Path)
return false
}
url := eventType.Path[index+len("/providers/"):]
url := event.Path[index+len("/providers/"):]
serviceURL, err := common.NewURL(url)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v} eventType.Path={%v}", url, err, eventType.Path)
logger.Errorf("[RegistryDataListener][DataChange]Listen NewURL({%s}) = error{%+v} event.Path={%s}", url, err, event.Path)
return false
}
l.mutex.Lock()
Expand All @@ -93,9 +94,9 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
if serviceURL.ServiceKey() == serviceKey {
listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Key: event.Path,
Value: serviceURL,
ConfigType: eventType.Action,
ConfigType: event.Action,
},
)
return true
Expand Down
6 changes: 0 additions & 6 deletions remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ const (
MaxFailTimes = 3
)

var (
errNilZkClientConn = perrors.New("zookeeper client{conn} is nil")
errNilChildren = perrors.Errorf("has none children")
errNilNode = perrors.Errorf("node does not exist")
)

// ValidateZookeeperClient validates client and sets options
func ValidateZookeeperClient(container ZkClientFacade, zkName string) error {
lock := container.ZkClientLock()
Expand Down
14 changes: 5 additions & 9 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,16 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li

newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
// FIXME always false
if err == errNilChildren {
// TODO need to ignore this error in gost
if err == gxzookeeper.ErrNilChildren {
content, _, connErr := l.client.Conn.Get(zkPath)
if connErr != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}",
zkPath, perrors.WithStack(connErr))
} else {
// TODO this if for config center listener, and will be removed when we refactor config center listener
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
}

} else {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
}
Expand Down Expand Up @@ -219,9 +219,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
var (
failTimes int
ttl time.Duration
event chan struct{}
)
event = make(chan struct{}, 4)
ttl = defaultTTL
if conf != nil {
timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
Expand All @@ -231,7 +229,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
}
}
defer close(event)
for {
// Get current children with watcher for the zkRootPath
children, childEventCh, err := l.client.GetChildrenW(zkRootPath)
Expand All @@ -251,8 +248,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
failTimes = 0
if len(children) == 0 {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", zkRootPath)
continue
logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", zkRootPath)
}
for _, c := range children {
// Only need to compare Path when subscribing to provider
Expand Down Expand Up @@ -324,10 +320,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
}

// startScheduleWatchTask periodically update provider information, return true when receive exit signal
func (l *ZkEventListener) startScheduleWatchTask(
zkRootPath string, children []string, ttl time.Duration,
listener remoting.DataListener, childEventCh <-chan zk.Event) bool {
// Periodically update provider information
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9
Expand Down

0 comments on commit 12cca45

Please sign in to comment.