diff --git a/common/url.go b/common/url.go index 058f76a897..2824ec68dd 100644 --- a/common/url.go +++ b/common/url.go @@ -413,6 +413,37 @@ func ServiceKey(intf string, group string, version string) string { return buf.String() } +// ParseServiceKey gets interface, group and version from service key +func ParseServiceKey(serviceKey string) (string, string, string) { + var ( + group string + version string + ) + if serviceKey == "" { + return "", "", "" + } + // get group if it exists + sepIndex := strings.Index(serviceKey, constant.PathSeparator) + if sepIndex != -1 { + group = serviceKey[:sepIndex] + serviceKey = serviceKey[sepIndex+1:] + } + // get version if it exists + sepIndex = strings.LastIndex(serviceKey, constant.KeySeparator) + if sepIndex != -1 { + version = serviceKey[sepIndex+1:] + serviceKey = serviceKey[:sepIndex] + } + + return serviceKey, group, version +} + +// IsAnyCondition judges if is any condition +func IsAnyCondition(intf, group, version string, serviceURL *URL) bool { + return intf == constant.AnyValue && (group == constant.AnyValue || + group == serviceURL.Group()) && (version == constant.AnyValue || version == serviceURL.Version()) +} + // ColonSeparatedKey // The format is "{interface}:[version]:[group]" func (c *URL) ColonSeparatedKey() string { diff --git a/common/url_test.go b/common/url_test.go index 89953c3abc..2971e6c45f 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -421,3 +421,134 @@ func TestCompareURLEqualFunc(t *testing.T) { func CustomCompareURLEqual(l *URL, r *URL, execludeParam ...string) bool { return l.PrimitiveURL == r.PrimitiveURL } + +func TestParseServiceKey(t *testing.T) { + type args struct { + serviceKey string + } + tests := []struct { + name string + args args + want string + want1 string + want2 string + }{ + { + name: "test1", + args: args{ + serviceKey: "group/interface:version", + }, + want: "interface", + want1: "group", + want2: "version", + }, + { + name: "test2", + args: args{ + serviceKey: "*/*:*", + }, + want: "*", + want1: "*", + want2: "*", + }, + { + name: "test3", + args: args{ + serviceKey: "group/org.apache.dubbo.mock.api.MockService", + }, + want: "org.apache.dubbo.mock.api.MockService", + want1: "group", + want2: "", + }, + { + name: "test4", + args: args{ + serviceKey: "org.apache.dubbo.mock.api.MockService", + }, + want: "org.apache.dubbo.mock.api.MockService", + want1: "", + want2: "", + }, + { + name: "test5", + args: args{ + serviceKey: "group/", + }, + want: "", + want1: "group", + want2: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1, got2 := ParseServiceKey(tt.args.serviceKey) + assert.Equalf(t, tt.want, got, "ParseServiceKey(%v)", tt.args.serviceKey) + assert.Equalf(t, tt.want1, got1, "ParseServiceKey(%v)", tt.args.serviceKey) + assert.Equalf(t, tt.want2, got2, "ParseServiceKey(%v)", tt.args.serviceKey) + }) + } +} + +func TestIsAnyCondition(t *testing.T) { + type args struct { + intf string + group string + version string + serviceURL *URL + } + serviceURL, _ := NewURL(GetLocalIp()+":0", WithProtocol("admin"), WithParams(url.Values{ + constant.GroupKey: {"group"}, + constant.VersionKey: {"version"}, + })) + tests := []struct { + name string + args args + want bool + }{ + { + name: "test1", + args: args{ + intf: constant.AnyValue, + group: constant.AnyValue, + version: constant.AnyValue, + serviceURL: serviceURL, + }, + want: true, + }, + { + name: "test2", + args: args{ + intf: constant.AnyValue, + group: "group", + version: "version", + serviceURL: serviceURL, + }, + want: true, + }, + { + name: "test3", + args: args{ + intf: "intf", + group: constant.AnyValue, + version: constant.AnyValue, + serviceURL: serviceURL, + }, + want: false, + }, + { + name: "test4", + args: args{ + intf: constant.AnyValue, + group: "group1", + version: constant.AnyValue, + serviceURL: serviceURL, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, IsAnyCondition(tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL), "IsAnyCondition(%v, %v, %v, %v)", tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL) + }) + } +} diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 65871adb9a..c8f0f26b3b 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -92,8 +92,10 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool { if l.closed { return false } + match := false for serviceKey, listener := range l.subscribed { - if serviceURL.ServiceKey() == serviceKey { + intf, group, version := common.ParseServiceKey(serviceKey) + if serviceURL.ServiceKey() == serviceKey || common.IsAnyCondition(intf, group, version, serviceURL) { listener.Process( &config_center.ConfigChangeEvent{ Key: event.Path, @@ -101,10 +103,10 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool { ConfigType: event.Action, }, ) - return true + match = true } } - return false + return match } // Close all RegistryConfigurationListener in subscribed diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 498fc33090..2e9abe3c81 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -18,6 +18,7 @@ package zookeeper import ( + "net/url" "path" "strings" "sync" @@ -238,8 +239,89 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel}) } } -func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) { + +// listenerAllDirEvents listens all services when conf.InterfaceKey = "*" +func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener remoting.DataListener) { + var ( + failTimes int + ttl time.Duration + ) + ttl = defaultTTL + if conf != nil { + if timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL)); err == nil { + ttl = timeout + } else { + logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL) + } + } + if ttl > 20e9 { + ttl = 20e9 + } + + rootPath := path.Join(constant.PathSeparator, constant.Dubbo) + for { + // get all interfaces + children, childEventCh, err := l.Client.GetChildrenW(rootPath) + if err != nil { + failTimes++ + if MaxFailTimes <= failTimes { + failTimes = MaxFailTimes + } + logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err) + // Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait + after := time.After(timeSecondDuration(failTimes * ConnDelay)) + select { + case <-after: + continue + case <-l.exit: + return + } + } + failTimes = 0 + if len(children) == 0 { + logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not get any children for the path \"%s\", please check if the provider does ready.", rootPath) + } + for _, c := range children { + // Build the child path + zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory) + // Save the path to avoid listen repeatedly + l.pathMapLock.Lock() + if _, ok := l.pathMap[zkRootPath]; ok { + logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkRootPath) + l.pathMapLock.Unlock() + continue + } else { + l.pathMap[zkRootPath] = uatomic.NewInt32(0) + } + l.pathMapLock.Unlock() + logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo interface key{%s}", zkRootPath) + l.wg.Add(1) + // listen every interface + go l.listenDirEvent(conf, zkRootPath, listener, c) + } + + ticker := time.NewTicker(ttl) + select { + case <-ticker.C: + ticker.Stop() + case zkEvent := <-childEventCh: + logger.Debugf("Get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}", + zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err) + ticker.Stop() + case <-l.exit: + logger.Warnf("listen(path{%s}) goroutine exit now...", rootPath) + ticker.Stop() + return + } + } +} + +func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener, intf string) { defer l.wg.Done() + if intf == constant.AnyValue { + l.listenAllDirEvents(conf, listener) + return + } var ( failTimes int ttl time.Duration @@ -279,7 +361,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li // Only need to compare Path when subscribing to provider if strings.LastIndex(zkRootPath, constant.ProviderCategory) != -1 { provider, _ := common.NewURL(c) - if provider.ServiceKey() != conf.ServiceKey() { + if provider.Interface() != intf || !common.IsAnyCondition(constant.AnyValue, conf.Group(), conf.Version(), provider) { continue } } @@ -326,7 +408,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li } if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) { return - } } } @@ -367,6 +448,7 @@ func (l *ZkEventListener) startScheduleWatchTask( } } } + func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second } @@ -378,7 +460,11 @@ func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, li logger.Infof("[Zookeeper Listener] listen dubbo path{%s}", zkPath) l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { - l.listenDirEvent(conf, zkPath, listener) + intf := "" + if conf != nil { + intf = conf.Interface() + } + l.listenDirEvent(conf, zkPath, listener, intf) logger.Warnf("ListenServiceEvent->listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) }(zkPath, listener) }