Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid the problem of cpu idling & fix wrong definition of error #1629

Merged
merged 3 commits into from
Dec 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster/cluster/base/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func GetLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) lo

methodName := invocation.MethodName()
// Get the service loadbalance config
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadbalance)
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)

// Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); len(v) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster/failback/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.I

// Get the service loadbalance config
url := invokers[0].GetURL()
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadbalance)
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)
// Get the service method loadbalance config if have
methodName := invocation.MethodName()
if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); v != "" {
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster/failsafe/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.I
url := invokers[0].GetURL()
methodName := invocation.MethodName()
// Get the service loadbalance config
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadbalance)
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)
// Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); v != "" {
lb = v
Expand Down
2 changes: 1 addition & 1 deletion cluster/directory/base/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
var (
url, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LocalHostValue, constant.DefaultPort))
anyURL, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.AnyhostValue))
anyURL, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.AnyHostValue))
)

func TestNewBaseDirectory(t *testing.T) {
Expand Down
12 changes: 2 additions & 10 deletions common/constant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
)

const (
DefaultLoadbalance = "random"
DefaultLoadBalance = "random"
DefaultRetries = "2"
DefaultRetriesInt = 2
DefaultProtocol = "dubbo"
Expand All @@ -45,7 +45,6 @@ const (
DefaultRestClient = "resty"
DefaultRestServer = "go-restful"
DefaultPort = 20000
DefaultMetadataport = 20005
)

const (
Expand All @@ -60,7 +59,7 @@ const (

const (
AnyValue = "*"
AnyhostValue = "0.0.0.0"
AnyHostValue = "0.0.0.0"
LocalHostValue = "192.168.1.1"
RemoveValuePrefix = "-"
)
Expand All @@ -87,10 +86,3 @@ const (
const (
ServiceDiscoveryDefaultGroup = "DEFAULT_GROUP"
)

const (
DefaultProviderConfFilePath = "../profiles/dev/server.yml"
DefaultConsumerConfFilePath = "../profiles/dev/client.yml"
DefaultLogConfFilePath = "../profiles/dev/log.yml"
DefaultRouterConfFilePath = "../profiles/dev/router.yml"
)
6 changes: 3 additions & 3 deletions config_center/configurator/override.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ func (c *overrideConfigurator) configureIfMatchInternal(url *common.URL) {

// configureIfMatch translate from java, compatible rules in java
func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) {
if constant.AnyhostValue == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip {
if constant.AnyHostValue == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip {
providers := c.configuratorUrl.GetParam(constant.OverrideProvidersKey, "")
if len(providers) == 0 || strings.Contains(providers, url.Location) || strings.Contains(providers, constant.AnyhostValue) {
if len(providers) == 0 || strings.Contains(providers, url.Location) || strings.Contains(providers, constant.AnyHostValue) {
c.configureIfMatchInternal(url)
}
}
Expand All @@ -129,7 +129,7 @@ func (c *overrideConfigurator) configureDeprecated(url *common.URL) {
localIP := common.GetLocalIp()
c.configureIfMatch(localIP, url)
} else {
c.configureIfMatch(constant.AnyhostValue, url)
c.configureIfMatch(constant.AnyHostValue, url)
}
}
}
4 changes: 2 additions & 2 deletions config_center/parser/configuration_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (parser *DefaultConfigurationParser) ParseToUrls(content string) ([]*common
func serviceItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.URL, error) {
addresses := item.Addresses
if len(addresses) == 0 {
addresses = append(addresses, constant.AnyhostValue)
addresses = append(addresses, constant.AnyHostValue)
}
var urls []*common.URL
for _, v := range addresses {
Expand Down Expand Up @@ -163,7 +163,7 @@ func serviceItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.UR
func appItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.URL, error) {
addresses := item.Addresses
if len(addresses) == 0 {
addresses = append(addresses, constant.AnyhostValue)
addresses = append(addresses, constant.AnyHostValue)
}
var urls []*common.URL
for _, v := range addresses {
Expand Down
17 changes: 10 additions & 7 deletions registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/registry"
Expand Down Expand Up @@ -71,17 +72,19 @@ func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.Con
}

// DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
providersPath := constant.PathSeparator + constant.ProviderCategory + constant.PathSeparator
// Intercept the last bit
index := strings.Index(eventType.Path, "/providers/")
index := strings.Index(event.Path, providersPath)
if index == -1 {
logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
logger.Warnf("[RegistryDataListener][DataChange]Listen error zk node path {%s}, "+
"this listener is used to listen services which under the directory of providers/", event.Path)
return false
}
url := eventType.Path[index+len("/providers/"):]
url := event.Path[index+len(providersPath):]
serviceURL, err := common.NewURL(url)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v} eventType.Path={%v}", url, err, eventType.Path)
logger.Errorf("[RegistryDataListener][DataChange]Listen NewURL({%s}) = error{%+v} event.Path={%s}", url, err, event.Path)
return false
}
l.mutex.Lock()
Expand All @@ -93,9 +96,9 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
if serviceURL.ServiceKey() == serviceKey {
listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Key: event.Path,
Value: serviceURL,
ConfigType: eventType.Action,
ConfigType: event.Action,
},
)
return true
Expand Down
6 changes: 0 additions & 6 deletions remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ const (
MaxFailTimes = 3
)

var (
errNilZkClientConn = perrors.New("zookeeper client{conn} is nil")
errNilChildren = perrors.Errorf("has none children")
errNilNode = perrors.Errorf("node does not exist")
)

// ValidateZookeeperClient validates client and sets options
func ValidateZookeeperClient(container ZkClientFacade, zkName string) error {
lock := container.ZkClientLock()
Expand Down
16 changes: 7 additions & 9 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,16 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li

newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
// FIXME always false
if err == errNilChildren {
// TODO need to ignore this error in gost
if err == gxzookeeper.ErrNilChildren {
content, _, connErr := l.client.Conn.Get(zkPath)
if connErr != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}",
zkPath, perrors.WithStack(connErr))
} else {
// TODO this if for config center listener, and will be removed when we refactor config center listener
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
}

} else {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
}
Expand Down Expand Up @@ -219,9 +219,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
var (
failTimes int
ttl time.Duration
event chan struct{}
)
event = make(chan struct{}, 4)
ttl = defaultTTL
if conf != nil {
timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
Expand All @@ -231,7 +229,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
}
}
defer close(event)
for {
// Get current children with watcher for the zkRootPath
children, childEventCh, err := l.client.GetChildrenW(zkRootPath)
Expand All @@ -247,12 +244,13 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
select {
case <-after:
continue
case <-l.exit:
return
}
}
failTimes = 0
if len(children) == 0 {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", zkRootPath)
continue
logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", zkRootPath)
}
for _, c := range children {
// Only need to compare Path when subscribing to provider
Expand Down Expand Up @@ -324,10 +322,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
}

// startScheduleWatchTask periodically update provider information, return true when receive exit signal
func (l *ZkEventListener) startScheduleWatchTask(
zkRootPath string, children []string, ttl time.Duration,
listener remoting.DataListener, childEventCh <-chan zk.Event) bool {
// Periodically update provider information
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9
Expand Down