Skip to content

Commit

Permalink
avoid the problem of cpu idling & fix wrong definition of error (#1629)
Browse files Browse the repository at this point in the history
* avoid the problem of cpu idling & fix wrong definition of error

* replace literal "/providers/" with constant

* add l.exit flag

Co-authored-by: dongjianhui03 <dongjianhui03@meituan.com>
  • Loading branch information
Mulavar and dongjianhui03 authored Dec 4, 2021
1 parent 44f7911 commit ec43e22
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cluster/cluster/base/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func GetLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) lo

methodName := invocation.MethodName()
// Get the service loadbalance config
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadbalance)
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)

// Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); len(v) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster/failback/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.I

// Get the service loadbalance config
url := invokers[0].GetURL()
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadbalance)
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)
// Get the service method loadbalance config if have
methodName := invocation.MethodName()
if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); v != "" {
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster/failsafe/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.I
url := invokers[0].GetURL()
methodName := invocation.MethodName()
// Get the service loadbalance config
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadbalance)
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)
// Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); v != "" {
lb = v
Expand Down
2 changes: 1 addition & 1 deletion cluster/directory/base/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
var (
url, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LocalHostValue, constant.DefaultPort))
anyURL, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.AnyhostValue))
anyURL, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.AnyHostValue))
)

func TestNewBaseDirectory(t *testing.T) {
Expand Down
12 changes: 2 additions & 10 deletions common/constant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
)

const (
DefaultLoadbalance = "random"
DefaultLoadBalance = "random"
DefaultRetries = "2"
DefaultRetriesInt = 2
DefaultProtocol = "dubbo"
Expand All @@ -45,7 +45,6 @@ const (
DefaultRestClient = "resty"
DefaultRestServer = "go-restful"
DefaultPort = 20000
DefaultMetadataport = 20005
)

const (
Expand All @@ -60,7 +59,7 @@ const (

const (
AnyValue = "*"
AnyhostValue = "0.0.0.0"
AnyHostValue = "0.0.0.0"
LocalHostValue = "192.168.1.1"
RemoveValuePrefix = "-"
)
Expand All @@ -87,10 +86,3 @@ const (
const (
ServiceDiscoveryDefaultGroup = "DEFAULT_GROUP"
)

const (
DefaultProviderConfFilePath = "../profiles/dev/server.yml"
DefaultConsumerConfFilePath = "../profiles/dev/client.yml"
DefaultLogConfFilePath = "../profiles/dev/log.yml"
DefaultRouterConfFilePath = "../profiles/dev/router.yml"
)
6 changes: 3 additions & 3 deletions config_center/configurator/override.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ func (c *overrideConfigurator) configureIfMatchInternal(url *common.URL) {

// configureIfMatch translate from java, compatible rules in java
func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) {
if constant.AnyhostValue == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip {
if constant.AnyHostValue == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip {
providers := c.configuratorUrl.GetParam(constant.OverrideProvidersKey, "")
if len(providers) == 0 || strings.Contains(providers, url.Location) || strings.Contains(providers, constant.AnyhostValue) {
if len(providers) == 0 || strings.Contains(providers, url.Location) || strings.Contains(providers, constant.AnyHostValue) {
c.configureIfMatchInternal(url)
}
}
Expand All @@ -129,7 +129,7 @@ func (c *overrideConfigurator) configureDeprecated(url *common.URL) {
localIP := common.GetLocalIp()
c.configureIfMatch(localIP, url)
} else {
c.configureIfMatch(constant.AnyhostValue, url)
c.configureIfMatch(constant.AnyHostValue, url)
}
}
}
4 changes: 2 additions & 2 deletions config_center/parser/configuration_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (parser *DefaultConfigurationParser) ParseToUrls(content string) ([]*common
func serviceItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.URL, error) {
addresses := item.Addresses
if len(addresses) == 0 {
addresses = append(addresses, constant.AnyhostValue)
addresses = append(addresses, constant.AnyHostValue)
}
var urls []*common.URL
for _, v := range addresses {
Expand Down Expand Up @@ -163,7 +163,7 @@ func serviceItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.UR
func appItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.URL, error) {
addresses := item.Addresses
if len(addresses) == 0 {
addresses = append(addresses, constant.AnyhostValue)
addresses = append(addresses, constant.AnyHostValue)
}
var urls []*common.URL
for _, v := range addresses {
Expand Down
17 changes: 10 additions & 7 deletions registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/registry"
Expand Down Expand Up @@ -71,17 +72,19 @@ func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.Con
}

// DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
providersPath := constant.PathSeparator + constant.ProviderCategory + constant.PathSeparator
// Intercept the last bit
index := strings.Index(eventType.Path, "/providers/")
index := strings.Index(event.Path, providersPath)
if index == -1 {
logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
logger.Warnf("[RegistryDataListener][DataChange]Listen error zk node path {%s}, "+
"this listener is used to listen services which under the directory of providers/", event.Path)
return false
}
url := eventType.Path[index+len("/providers/"):]
url := event.Path[index+len(providersPath):]
serviceURL, err := common.NewURL(url)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v} eventType.Path={%v}", url, err, eventType.Path)
logger.Errorf("[RegistryDataListener][DataChange]Listen NewURL({%s}) = error{%+v} event.Path={%s}", url, err, event.Path)
return false
}
l.mutex.Lock()
Expand All @@ -93,9 +96,9 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
if serviceURL.ServiceKey() == serviceKey {
listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Key: event.Path,
Value: serviceURL,
ConfigType: eventType.Action,
ConfigType: event.Action,
},
)
return true
Expand Down
6 changes: 0 additions & 6 deletions remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ const (
MaxFailTimes = 3
)

var (
errNilZkClientConn = perrors.New("zookeeper client{conn} is nil")
errNilChildren = perrors.Errorf("has none children")
errNilNode = perrors.Errorf("node does not exist")
)

// ValidateZookeeperClient validates client and sets options
func ValidateZookeeperClient(container ZkClientFacade, zkName string) error {
lock := container.ZkClientLock()
Expand Down
16 changes: 7 additions & 9 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,16 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li

newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
// FIXME always false
if err == errNilChildren {
// TODO need to ignore this error in gost
if err == gxzookeeper.ErrNilChildren {
content, _, connErr := l.client.Conn.Get(zkPath)
if connErr != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}",
zkPath, perrors.WithStack(connErr))
} else {
// TODO this if for config center listener, and will be removed when we refactor config center listener
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
}

} else {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
}
Expand Down Expand Up @@ -219,9 +219,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
var (
failTimes int
ttl time.Duration
event chan struct{}
)
event = make(chan struct{}, 4)
ttl = defaultTTL
if conf != nil {
timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
Expand All @@ -231,7 +229,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
}
}
defer close(event)
for {
// Get current children with watcher for the zkRootPath
children, childEventCh, err := l.client.GetChildrenW(zkRootPath)
Expand All @@ -247,12 +244,13 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
select {
case <-after:
continue
case <-l.exit:
return
}
}
failTimes = 0
if len(children) == 0 {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", zkRootPath)
continue
logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", zkRootPath)
}
for _, c := range children {
// Only need to compare Path when subscribing to provider
Expand Down Expand Up @@ -324,10 +322,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
}

// startScheduleWatchTask periodically update provider information, return true when receive exit signal
func (l *ZkEventListener) startScheduleWatchTask(
zkRootPath string, children []string, ttl time.Duration,
listener remoting.DataListener, childEventCh <-chan zk.Event) bool {
// Periodically update provider information
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9
Expand Down

0 comments on commit ec43e22

Please sign in to comment.