Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add subscribe any value #2267

Merged
merged 7 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,37 @@ func ServiceKey(intf string, group string, version string) string {
return buf.String()
}

// ParseServiceKey gets interface, group and version from service key
func ParseServiceKey(serviceKey string) (string, string, string) {
var (
group string
version string
)
if serviceKey == "" {
return "", "", ""
}
// get group if it exists
sepIndex := strings.Index(serviceKey, constant.PathSeparator)
if sepIndex != -1 {
group = serviceKey[:sepIndex]
serviceKey = serviceKey[sepIndex+1:]
}
// get version if it exists
sepIndex = strings.LastIndex(serviceKey, constant.KeySeparator)
if sepIndex != -1 {
version = serviceKey[sepIndex+1:]
serviceKey = serviceKey[:sepIndex]
}

return serviceKey, group, version
}

// IsAnyCondition judges if is any condition
func IsAnyCondition(intf, group, version string, serviceURL *URL) bool {
return intf == constant.AnyValue && (group == constant.AnyValue ||
group == serviceURL.Group()) && (version == constant.AnyValue || version == serviceURL.Version())
}

// ColonSeparatedKey
// The format is "{interface}:[version]:[group]"
func (c *URL) ColonSeparatedKey() string {
Expand Down
131 changes: 131 additions & 0 deletions common/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,134 @@ func TestCompareURLEqualFunc(t *testing.T) {
func CustomCompareURLEqual(l *URL, r *URL, execludeParam ...string) bool {
return l.PrimitiveURL == r.PrimitiveURL
}

func TestParseServiceKey(t *testing.T) {
type args struct {
serviceKey string
}
tests := []struct {
name string
args args
want string
want1 string
want2 string
}{
{
name: "test1",
args: args{
serviceKey: "group/interface:version",
},
want: "interface",
want1: "group",
want2: "version",
},
{
name: "test2",
args: args{
serviceKey: "*/*:*",
},
want: "*",
want1: "*",
want2: "*",
},
{
name: "test3",
args: args{
serviceKey: "group/org.apache.dubbo.mock.api.MockService",
},
want: "org.apache.dubbo.mock.api.MockService",
want1: "group",
want2: "",
},
{
name: "test4",
args: args{
serviceKey: "org.apache.dubbo.mock.api.MockService",
},
want: "org.apache.dubbo.mock.api.MockService",
want1: "",
want2: "",
},
{
name: "test5",
args: args{
serviceKey: "group/",
},
want: "",
want1: "group",
want2: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1, got2 := ParseServiceKey(tt.args.serviceKey)
assert.Equalf(t, tt.want, got, "ParseServiceKey(%v)", tt.args.serviceKey)
assert.Equalf(t, tt.want1, got1, "ParseServiceKey(%v)", tt.args.serviceKey)
assert.Equalf(t, tt.want2, got2, "ParseServiceKey(%v)", tt.args.serviceKey)
})
}
}

func TestIsAnyCondition(t *testing.T) {
type args struct {
intf string
group string
version string
serviceURL *URL
}
serviceURL, _ := NewURL(GetLocalIp()+":0", WithProtocol("admin"), WithParams(url.Values{
constant.GroupKey: {"group"},
constant.VersionKey: {"version"},
}))
tests := []struct {
name string
args args
want bool
}{
{
name: "test1",
args: args{
intf: constant.AnyValue,
group: constant.AnyValue,
version: constant.AnyValue,
serviceURL: serviceURL,
},
want: true,
},
{
name: "test2",
args: args{
intf: constant.AnyValue,
group: "group",
version: "version",
serviceURL: serviceURL,
},
want: true,
},
{
name: "test3",
args: args{
intf: "intf",
group: constant.AnyValue,
version: constant.AnyValue,
serviceURL: serviceURL,
},
want: false,
},
{
name: "test4",
args: args{
intf: constant.AnyValue,
group: "group1",
version: constant.AnyValue,
serviceURL: serviceURL,
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, IsAnyCondition(tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL), "IsAnyCondition(%v, %v, %v, %v)", tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL)
})
}
}
8 changes: 5 additions & 3 deletions registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,21 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
if l.closed {
return false
}
match := false
for serviceKey, listener := range l.subscribed {
if serviceURL.ServiceKey() == serviceKey {
intf, group, version := common.ParseServiceKey(serviceKey)
if serviceURL.ServiceKey() == serviceKey || common.IsAnyCondition(intf, group, version, serviceURL) {
listener.Process(
&config_center.ConfigChangeEvent{
Key: event.Path,
Value: serviceURL.Clone(),
ConfigType: event.Action,
},
)
return true
match = true
}
}
return false
return match
wudong5 marked this conversation as resolved.
Show resolved Hide resolved
}

// Close all RegistryConfigurationListener in subscribed
Expand Down
94 changes: 90 additions & 4 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package zookeeper

import (
"net/url"
"path"
"strings"
"sync"
Expand Down Expand Up @@ -238,8 +239,89 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {

// listenerAllDirEvents listens all services when conf.InterfaceKey = "*"
func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener remoting.DataListener) {
var (
failTimes int
ttl time.Duration
)
ttl = defaultTTL
if conf != nil {
if timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL)); err == nil {
ttl = timeout
} else {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
}
wudong5 marked this conversation as resolved.
Show resolved Hide resolved
}
if ttl > 20e9 {
ttl = 20e9
}

rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
for {
// get all interfaces
children, childEventCh, err := l.Client.GetChildrenW(rootPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
// Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
case <-after:
continue
case <-l.exit:
return
}
}
failTimes = 0
if len(children) == 0 {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not get any children for the path \"%s\", please check if the provider does ready.", rootPath)
}
for _, c := range children {
// Build the child path
zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
justxuewei marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := l.pathMap[zkRootPath]; ok {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkRootPath)
l.pathMapLock.Unlock()
continue
} else {
l.pathMap[zkRootPath] = uatomic.NewInt32(0)
}
justxuewei marked this conversation as resolved.
Show resolved Hide resolved
l.pathMapLock.Unlock()
logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo interface key{%s}", zkRootPath)
l.wg.Add(1)
// listen every interface
go l.listenDirEvent(conf, zkRootPath, listener, c)
}

ticker := time.NewTicker(ttl)
select {
case <-ticker.C:
ticker.Stop()
case zkEvent := <-childEventCh:
logger.Debugf("Get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
case <-l.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", rootPath)
ticker.Stop()
return
}
}
}

func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener, intf string) {
defer l.wg.Done()
if intf == constant.AnyValue {
l.listenAllDirEvents(conf, listener)
return
}
var (
failTimes int
ttl time.Duration
Expand Down Expand Up @@ -279,7 +361,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
// Only need to compare Path when subscribing to provider
if strings.LastIndex(zkRootPath, constant.ProviderCategory) != -1 {
provider, _ := common.NewURL(c)
if provider.ServiceKey() != conf.ServiceKey() {
if provider.Interface() != intf || !common.IsAnyCondition(constant.AnyValue, conf.Group(), conf.Version(), provider) {
continue
}
}
Expand Down Expand Up @@ -326,7 +408,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) {
return

}
}
}
Expand Down Expand Up @@ -367,6 +448,7 @@ func (l *ZkEventListener) startScheduleWatchTask(
}
}
}

func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
Expand All @@ -378,7 +460,11 @@ func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, li
logger.Infof("[Zookeeper Listener] listen dubbo path{%s}", zkPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(conf, zkPath, listener)
intf := ""
if conf != nil {
intf = conf.Interface()
}
l.listenDirEvent(conf, zkPath, listener, intf)
logger.Warnf("ListenServiceEvent->listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}
Expand Down