Skip to content

Commit

Permalink
Merge branch 'main' into condition
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenlj authored Apr 18, 2023
2 parents ba622ca + 0ea6075 commit 6ba61cc
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 7 deletions.
31 changes: 31 additions & 0 deletions common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,37 @@ func ServiceKey(intf string, group string, version string) string {
return buf.String()
}

// ParseServiceKey gets interface, group and version from service key
func ParseServiceKey(serviceKey string) (string, string, string) {
var (
group string
version string
)
if serviceKey == "" {
return "", "", ""
}
// get group if it exists
sepIndex := strings.Index(serviceKey, constant.PathSeparator)
if sepIndex != -1 {
group = serviceKey[:sepIndex]
serviceKey = serviceKey[sepIndex+1:]
}
// get version if it exists
sepIndex = strings.LastIndex(serviceKey, constant.KeySeparator)
if sepIndex != -1 {
version = serviceKey[sepIndex+1:]
serviceKey = serviceKey[:sepIndex]
}

return serviceKey, group, version
}

// IsAnyCondition judges if is any condition
func IsAnyCondition(intf, group, version string, serviceURL *URL) bool {
return intf == constant.AnyValue && (group == constant.AnyValue ||
group == serviceURL.Group()) && (version == constant.AnyValue || version == serviceURL.Version())
}

// ColonSeparatedKey
// The format is "{interface}:[version]:[group]"
func (c *URL) ColonSeparatedKey() string {
Expand Down
131 changes: 131 additions & 0 deletions common/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,134 @@ func TestCompareURLEqualFunc(t *testing.T) {
func CustomCompareURLEqual(l *URL, r *URL, execludeParam ...string) bool {
return l.PrimitiveURL == r.PrimitiveURL
}

func TestParseServiceKey(t *testing.T) {
type args struct {
serviceKey string
}
tests := []struct {
name string
args args
want string
want1 string
want2 string
}{
{
name: "test1",
args: args{
serviceKey: "group/interface:version",
},
want: "interface",
want1: "group",
want2: "version",
},
{
name: "test2",
args: args{
serviceKey: "*/*:*",
},
want: "*",
want1: "*",
want2: "*",
},
{
name: "test3",
args: args{
serviceKey: "group/org.apache.dubbo.mock.api.MockService",
},
want: "org.apache.dubbo.mock.api.MockService",
want1: "group",
want2: "",
},
{
name: "test4",
args: args{
serviceKey: "org.apache.dubbo.mock.api.MockService",
},
want: "org.apache.dubbo.mock.api.MockService",
want1: "",
want2: "",
},
{
name: "test5",
args: args{
serviceKey: "group/",
},
want: "",
want1: "group",
want2: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1, got2 := ParseServiceKey(tt.args.serviceKey)
assert.Equalf(t, tt.want, got, "ParseServiceKey(%v)", tt.args.serviceKey)
assert.Equalf(t, tt.want1, got1, "ParseServiceKey(%v)", tt.args.serviceKey)
assert.Equalf(t, tt.want2, got2, "ParseServiceKey(%v)", tt.args.serviceKey)
})
}
}

func TestIsAnyCondition(t *testing.T) {
type args struct {
intf string
group string
version string
serviceURL *URL
}
serviceURL, _ := NewURL(GetLocalIp()+":0", WithProtocol("admin"), WithParams(url.Values{
constant.GroupKey: {"group"},
constant.VersionKey: {"version"},
}))
tests := []struct {
name string
args args
want bool
}{
{
name: "test1",
args: args{
intf: constant.AnyValue,
group: constant.AnyValue,
version: constant.AnyValue,
serviceURL: serviceURL,
},
want: true,
},
{
name: "test2",
args: args{
intf: constant.AnyValue,
group: "group",
version: "version",
serviceURL: serviceURL,
},
want: true,
},
{
name: "test3",
args: args{
intf: "intf",
group: constant.AnyValue,
version: constant.AnyValue,
serviceURL: serviceURL,
},
want: false,
},
{
name: "test4",
args: args{
intf: constant.AnyValue,
group: "group1",
version: constant.AnyValue,
serviceURL: serviceURL,
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, IsAnyCondition(tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL), "IsAnyCondition(%v, %v, %v, %v)", tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL)
})
}
}
8 changes: 5 additions & 3 deletions registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,21 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
if l.closed {
return false
}
match := false
for serviceKey, listener := range l.subscribed {
if serviceURL.ServiceKey() == serviceKey {
intf, group, version := common.ParseServiceKey(serviceKey)
if serviceURL.ServiceKey() == serviceKey || common.IsAnyCondition(intf, group, version, serviceURL) {
listener.Process(
&config_center.ConfigChangeEvent{
Key: event.Path,
Value: serviceURL.Clone(),
ConfigType: event.Action,
},
)
return true
match = true
}
}
return false
return match
}

// Close all RegistryConfigurationListener in subscribed
Expand Down
94 changes: 90 additions & 4 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package zookeeper

import (
"net/url"
"path"
"strings"
"sync"
Expand Down Expand Up @@ -238,8 +239,89 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {

// listenerAllDirEvents listens all services when conf.InterfaceKey = "*"
func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener remoting.DataListener) {
var (
failTimes int
ttl time.Duration
)
ttl = defaultTTL
if conf != nil {
if timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL)); err == nil {
ttl = timeout
} else {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
}
}
if ttl > 20e9 {
ttl = 20e9
}

rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
for {
// get all interfaces
children, childEventCh, err := l.Client.GetChildrenW(rootPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
// Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
case <-after:
continue
case <-l.exit:
return
}
}
failTimes = 0
if len(children) == 0 {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not get any children for the path \"%s\", please check if the provider does ready.", rootPath)
}
for _, c := range children {
// Build the child path
zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
if _, ok := l.pathMap[zkRootPath]; ok {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkRootPath)
l.pathMapLock.Unlock()
continue
} else {
l.pathMap[zkRootPath] = uatomic.NewInt32(0)
}
l.pathMapLock.Unlock()
logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo interface key{%s}", zkRootPath)
l.wg.Add(1)
// listen every interface
go l.listenDirEvent(conf, zkRootPath, listener, c)
}

ticker := time.NewTicker(ttl)
select {
case <-ticker.C:
ticker.Stop()
case zkEvent := <-childEventCh:
logger.Debugf("Get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
case <-l.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", rootPath)
ticker.Stop()
return
}
}
}

func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener, intf string) {
defer l.wg.Done()
if intf == constant.AnyValue {
l.listenAllDirEvents(conf, listener)
return
}
var (
failTimes int
ttl time.Duration
Expand Down Expand Up @@ -279,7 +361,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
// Only need to compare Path when subscribing to provider
if strings.LastIndex(zkRootPath, constant.ProviderCategory) != -1 {
provider, _ := common.NewURL(c)
if provider.ServiceKey() != conf.ServiceKey() {
if provider.Interface() != intf || !common.IsAnyCondition(constant.AnyValue, conf.Group(), conf.Version(), provider) {
continue
}
}
Expand Down Expand Up @@ -326,7 +408,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) {
return

}
}
}
Expand Down Expand Up @@ -367,6 +448,7 @@ func (l *ZkEventListener) startScheduleWatchTask(
}
}
}

func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
Expand All @@ -378,7 +460,11 @@ func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, li
logger.Infof("[Zookeeper Listener] listen dubbo path{%s}", zkPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(conf, zkPath, listener)
intf := ""
if conf != nil {
intf = conf.Interface()
}
l.listenDirEvent(conf, zkPath, listener, intf)
logger.Warnf("ListenServiceEvent->listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}
Expand Down

0 comments on commit 6ba61cc

Please sign in to comment.